co.paralleluniverse/pulsar

0.7.7-SNAPSHOT


A Clojure lightweight thread, asynchronous programming, and actor library

dependencies

org.clojure/clojure
1.8.0
co.paralleluniverse/quasar-core
unquotequasar-version
co.paralleluniverse/quasar-actors
unquotequasar-version
org.ow2.asm/asm
5.1
org.clojure/core.match
0.2.2
org.flatland/useful
0.11.5
gloss
0.2.6



(this space intentionally left almost blank)
 
(ns co.paralleluniverse.pulsar.actors-test
  (:use midje.sweet
        co.paralleluniverse.pulsar.core
        co.paralleluniverse.pulsar.actors)
;  (:require [co.paralleluniverse.pulsar.lazyseq :as s :refer [channel->lazy-seq]])
  (:refer-clojure :exclude [promise await bean])
  (:import [java.util.concurrent TimeUnit TimeoutException ExecutionException]
           [co.paralleluniverse.common.util Debug]
           [co.paralleluniverse.strands Strand]
           [co.paralleluniverse.fibers Fiber]
           (co.paralleluniverse.actors LocalActor)))

actors

(fact "The spawn macro will evaluate arguments by value"
      (let [a (spawn #(do
                       (spawn (fn [parent] (! parent :something)) @self)
                       (receive [m] :something m :after 1000 nil)))]
        (join a))
      => :something)
(fact "When actor throws exception then join throws it"
      (let [actor (spawn #(throw (Exception. "my exception")))]
        (join actor))
      => (throws Exception "my exception"))
(fact "When actor returns a value then join returns it"
      (let [actor (spawn #(+ 41 1))]
        (join actor))
      => 42)
(fact "actor-receive"
      (fact "Test simple actor send/receive"
            (let [actor (spawn #(receive))]
              (! actor :abc)
              (join actor)) => :abc)
      (fact "Test receive after sleep"
            (let [actor
                  (spawn #(let [m1 (receive)
                                m2 (receive)]
                            (+ m1 m2)))]
              (! actor 13)
              (Thread/sleep 200)
              (! actor 12)
              (join actor)) => 25)
      (fact "When simple receive and timeout then return nil"
            (let [actor
                  (spawn #(let [m1 (receive-timed 50)
                                m2 (receive-timed 50)
                                m3 (receive-timed 50)]
                           [m1 m2 m3]))]
              (! actor 1)
              (Thread/sleep 20)
              (! actor 2)
              (Thread/sleep 100)
              (! actor 3)
              (fact (.isFiber (LocalActor/getStrand actor)) => true)
              (join actor) => [1 2 nil]))
      (fact "When simple receive (on thread) and timeout then return nil"
            (let [actor
                  (spawn
                    :scheduler :thread
                    #(let [m1 (receive-timed 50)
                                m2 (receive-timed 50)
                                m3 (receive-timed 50)]
                            [m1 m2 m3]))]
              (! actor 1)
              (Thread/sleep 20)
              (! actor 2)
              (Thread/sleep 100)
              (! actor 3)
              (fact (.isFiber (LocalActor/getStrand actor)) => false)
              (join actor) => [1 2 nil])))
(fact "matching-receive"
      (fact "Test actor matching receive 1"
            (let [actor (spawn
                          #(receive
                             :abc "yes!"
                             :else "oy"))]
              (! actor :abc)
              (join actor)) => "yes!")
      (fact "Test actor matching receive 2"
            (let [actor (spawn
                          #(receive
                             :abc "yes!"
                             [:why? answer] answer
                             :else "oy"))]
              (! actor [:why? "because!"])
              (join actor)) => "because!")
      (fact "Test actor matching receive 3"
            (let [res (atom [])
                  actor (spawn
                          #(receive
                            [x y] (+ x y)))]
              (! actor [2 3])
              (join actor)) => 5)
      (fact "When matching receive and timeout then run :after clause"
            (let [actor
                  (spawn
                    #(receive
                       [:foo] nil
                       :else (println "got it!")
                       :after 30 :timeout))]
              (Thread/sleep 150)
              (! actor 1)
              (join actor)) => :timeout))
(fact "selective-receive"
      (fact "Test selective receive1"
            (let [res (atom [])
                  actor (spawn
                          #(dotimes [i 2]
                             (receive
                               [:foo x] (do
                                          (swap! res conj x)
                                          (receive
                                            [:baz z] (swap! res conj z)))
                               [:bar y] (swap! res conj y)
                               [:baz z] (swap! res conj z))))]
              (! actor [:foo 1])
              (! actor [:bar 2])
              (! actor [:baz 3])
              (join actor)
              @res) => [1 3 2])
      (fact "Test selective ping pong"
            (let [actor1 (spawn
                           #(receive
                             [from m] (! from @self (str m "!!!")))) ; same as (! from [@self (str m "!!!")])
                  actor2 (spawn
                           (fn []
                             (! actor1 @self (receive)) ; same as (! actor1 [@self (receive)])
                             (receive
                               [actor1 res] res)))]
              (! actor2 "hi")
              (join actor2)) => "hi!!!"))
(facts "actor-link"
       (fact "When an actor dies, its link gets an exception"
             (let [actor1 (spawn #(Fiber/sleep 100))
                   actor2 (spawn
                            #(try
                              (loop [] (receive [m] :foo :bar) (recur))
                              (catch co.paralleluniverse.actors.LifecycleException e
                                true)))]
               (link! actor1 actor2)
               (fact (.isFiber (LocalActor/getStrand actor1)) => true)
               (join actor1)
               (join actor2)) => true)
       (fact "When an actor (on a thread) dies, its link gets an exception"
             (let [actor1 (spawn :scheduler :thread
                                 #(Strand/sleep 100))
                   actor2 (spawn
                            :scheduler :thread
                            #(try
                              (loop [] (receive [m] :foo :bar) (recur))
                              (catch co.paralleluniverse.actors.LifecycleException e
                                true)))]
               (link! actor1 actor2)
               (fact (.isFiber (LocalActor/getStrand actor1)) => false)
               (join actor1)
               (join actor2)) => true)
       (fact "When an actor dies and lifecycle-handler is defined, then it gets a message"
             (let [actor1 (spawn #(Fiber/sleep 100))
                   actor2 (spawn :lifecycle-handler #(! @self [:foo (first %)])
                                 #(try
                                    (loop [] (receive [m]
                                                      [:foo x] x
                                                      :else (recur)))
                                    (catch co.paralleluniverse.actors.LifecycleException e nil)))]
               (link! actor1 actor2)
               (join actor1)
               (join actor2)) => :exit)
       (fact "When an actor dies, and its link traps with :trap, then its link gets a message"
             (let [actor1 (spawn #(Fiber/sleep 100))
                   actor2 (spawn :trap true
                                 (fn []
                                   (link! actor1)
                                   (receive [m]
                                            [:exit _ actor reason] actor)))]
               (join actor1)
               (fact (join actor2) => actor1)))
       (fact "When an actor dies, and its link traps with trap!, then its link gets a message"
             (let [actor1 (spawn #(Fiber/sleep 100))
                   actor2 (spawn (fn []
                                   (trap!)
                                   (link! actor1)
                                   (receive [m]
                                            [:exit _ actor reason] actor)))]
               (join actor1)
               (fact (join actor2) => actor1))))
(fact "actor-watch"
      (fact "When an actor dies, its watch gets a message"
            (let [actor1 (spawn #(Fiber/sleep 200))
                  actor2 (spawn
                           (fn []
                             (let [w (watch! actor1)]
                               (receive
                                 [:exit w actor reason] actor))))]
              (join actor1)
              (join actor2) => actor1))
      (fact "When an actor dies, its watch gets a message2"
            (let [actor1 (spawn #(Fiber/sleep 200))
                  actor2 (spawn
                           (fn []
                             (let [w (watch! actor1)]
                               (receive
                                 [:exit w actor reason] actor))))]
              (join actor1)
              (fact (join actor2) => actor1)))
      (fact "When an actor dies, its watch gets a message3"
            (let [actor1 (spawn #(Fiber/sleep 200))
                  actor2 (spawn
                           (fn []
                               (let [w (watch! actor1)]
                                    (receive
                                      [:exit w actor reason] actor
                                      :else "fail"))))]
                 (join actor1)
                 (fact (join actor2) => actor1))))
(facts "actor-state"
       (fact "Test recur actor-state"
             (let [actor
                   (spawn #(loop [i (int 2)
                                  state (int 0)]
                             (if (== i 0)
                               state
                               (recur (dec i) (+ state (int (receive)))))))]
               (! actor 13)
               (! actor 12)
               (join actor)) => 25)
       (fact "Test simple actor-state"
             (let [actor
                   (spawn #(do
                             (set-state! 0)
                             (set-state! (+ @state (receive)))
                             (set-state! (+ @state (receive)))
                             @state))]
               (! actor 13)
               (! actor 12)
               (join actor)) => 25)
       (fact "Test primitive actor-state"
             (let [actor (spawn (actor [^int sum 0]
                                       (set! sum (int (+ sum (receive))))
                                       (set! sum (int (+ sum (receive))))
                                       sum))]
               (! actor 13)
               (! actor 12)
               (join actor)) => 25))
(defsfn f1 []
  (inc (receive)))
(defsfn f2 [x]
  (+ x (receive)))
(defactor a1 []
  (inc (receive)))
(defactor a2 [^double x]
  (+ x (receive)))
(facts "spawn-syntax"
       (fact "Test spawn inline function"
             (let [actor
                   (spawn #(inc (receive)))]
               (! actor 41)
               (join actor)) => 42)
       (fact "Test spawn simple function"
             (let [actor
                   (spawn f1)]
               (! actor 41)
               (join actor)) => 42)
       (fact "Test spawn function with args"
             (let [actor
                   (spawn f2 5)]
               (! actor 37)
               (join actor)) => 42)
       (fact "Test spawn simple actor"
             (let [actor
                   (spawn a1)]
               (! actor 41)
               (join actor)) => 42)
       (fact "Test spawn simple actor with constructor args"
             (let [actor
                   (spawn a2 3.4)]
               (! actor 38.6)
               (join actor)) => 42.0))

(facts "mailbox-seq" (fact "Send and receive sequence (via @mailbox)" (let [actor (spawn #(doall (take 5 (channel->lazy-seq @mailbox))))] (snd-seq (mailbox-of actor) (take 10 (range))) (join actor)) => '(0 1 2 3 4)) (fact "Map received sequence (via @mailbox)" (let [actor (spawn (fn [] (doall (map #(* % %) (take 5 (channel->lazy-seq @mailbox))))))] (snd-seq (mailbox-of actor) (take 10 (range))) (join actor)) => '(0 1 4 9 16)) (fact "Filter received sequence (via @mailbox)" (let [actor (spawn #(s/doall (filter even? (take 5 (channel->lazy-seq @mailbox)))))] (snd-seq (mailbox-of actor) (take 10 (range))) (join actor)) => '(0 2 4)))

(fact "strampoline-test"
      (fact "Test trampolining actor"
            (let [state2 (sfn []
                                (receive
                                  :bar :foobar))
                  state1 (sfn []
                                (receive
                                  :foo state2))
                  actor (spawn (fn []
                                 (strampoline state1)))]
              (! actor :foo)
              (Thread/sleep 50)
              (! actor :bar)
              (join actor)) => :foobar)
      (fact "Test trampolining actor with selective receive"
            (let [state2 (sfn []
                                (receive
                                  :bar :foobar))
                  state1 (sfn []
                                (receive
                                  :foo state2))
                  actor (spawn (fn []
                                 (strampoline state1)))]
              (! actor :bar)
              (Thread/sleep 50)
              (! actor :foo)
              (join actor)) => :foobar))

gen-server

(fact "When gen-server starts then init is called"
      (let [called (atom false)
            gs (spawn
                 (gen-server (reify Server
                               (init [_]
                                     (reset! called true)
                                     (shutdown!))
                               (terminate [_ cause]))))]
        (join gs)
        @called) => true)
(fact "When no messages then handle-timeout is called"
      (let [times (atom 0)
            gs (spawn
                 (gen-server :timeout 20
                             (reify Server
                               (init [_])
                               (handle-timeout [_]
                                               (if (< @times 5)
                                                 (swap! times inc)
                                                 (shutdown!)))
                               (terminate [_ cause]))))]
        (fact
          (join 50 :ms gs) => (throws TimeoutException))
        (join 200 :ms gs)
        @times) => 5)
(fact "Server dies on shutdown"
      (let [times (atom 0)
            gs (spawn
                 (gen-server (reify Server
                               (init [_])
                               (terminate [_ cause]
                                          (fact cause => nil)))))]
        (fact
          (join 50 :ms gs) => (throws TimeoutException))
        (shutdown! gs)
        (join gs)) => nil)
(facts "gen-server call!"
       (fact "When gen-server call! then result is returned"
             (let [gs (spawn
                        (gen-server (reify Server
                                      (init [_])
                                      (terminate [_ cause])
                                      (handle-call [_ from id [a b]]
                                                   (+ a b)))))]
               (call! gs 3 4) => 7))
       (fact "When gen-server call! then result is returned (with sleep)"
             (let [gs (spawn
                        (gen-server (reify Server
                                      (init [_])
                                      (terminate [_ cause])
                                      (handle-call [_ from id [a b]]
                                        (Strand/sleep 50)
                                        (+ a b)))))]
               (fact (.isFiber (LocalActor/getStrand gs)) => true)
               (call! gs 3 4) => 7))
       (fact "When gen-server (on thread) call! then result is returned (with sleep)"
             (let [gs (spawn
                        :scheduler :thread
                        (gen-server (reify Server
                                      (init [_])
                                      (terminate [_ cause])
                                      (handle-call [_ from id [a b]]
                                        (Strand/sleep 50)
                                        (+ a b)))))]
               (fact (.isFiber (LocalActor/getStrand gs)) => false)
               (call! gs 3 4) => 7))
       (fact "When gen-server call! from fiber then result is returned"
             (let [gs (spawn
                        (gen-server (reify Server
                                      (init [_])
                                      (terminate [_ cause])
                                      (handle-call [_ from id [a b]]
                                                   (+ a b)))))
                   fib (spawn-fiber #(call! gs 3 4))]
               (join fib) => 7))
       (fact "When gen-server call! from fiber then result is returned (with sleep)"
             (let [gs (spawn
                        (gen-server (reify Server
                                      (init [_])
                                      (terminate [_ cause])
                                      (handle-call [_ from id [a b]]
                                                   (Strand/sleep 50)
                                                   (+ a b)))))
                   fib (spawn-fiber #(call! gs 3 4))]
               (join fib) => 7))
       (fact "When gen-server call! from actor then result is returned"
             (let [gs (spawn (gen-server
                               (reify Server
                                 (init [_])
                                 (terminate [_ cause])
                                 (handle-call [_ from id [a b]]
                                              (+ a b)))))
                   actor (spawn #(call! gs 3 4))]
               (join actor) => 7))
       (fact "When handle-call throws exception then call! throws it"
             (let [gs (spawn
                        (gen-server (reify Server
                                      (init [_])
                                      (terminate [_ cause])
                                      (handle-call [_ from id [a b]]
                                                   (throw (Exception. "oops!"))))))]
               (call! gs 3 4) => (throws Exception "oops!"))))
(fact "when gen-server doesn't respond then timeout"
      (let [gs (spawn
                 (gen-server (reify Server
                               (init [_])
                               (terminate [_ cause])
                               (handle-call [_ from id [a b]]
                                            (Strand/sleep 100)
                                            (+ a b)))))]
        (call-timed! gs 10 :ms 3 4) => (throws TimeoutException)))
(fact "when reply! is called return value in call!"
      (let [gs (spawn
                 (gen-server :timeout 50
                             (reify Server
                               (init [_]
                                     (set-state! {}))
                               (terminate [_ cause])
                               (handle-call [_ from id [a b]]
                                            (set-state! (assoc @state :a a :b b :from from :id id))
                                            nil)
                               (handle-timeout [_]
                                               (let [{:keys [a b from id]} @state]
                                                 (when id
                                                   (reply! from id (+ a b))
                                                   (set-state! nil)))))))]
        (fact
          (call-timed! gs 10 :ms 3 4) => (throws TimeoutException))
        (fact
          (call-timed! gs 100 :ms 5 6) => 11)
        (shutdown! gs)))
(fact "When cast! then handle-cast is called"
      (let [res (atom nil)
            gs (spawn
                 (gen-server (reify Server
                               (init [_])
                               (terminate [_ cause])
                               (handle-cast [_ from id [a b]]
                                            (reset! res (+ a b))))))]
        (cast! gs 3 4)
        (shutdown! gs)
        (join gs)
        @res => 7))
(fact "Messages sent to a gen-server are passed to handle-info"
      (let [res (atom nil)
            gs (spawn
                 (gen-server (reify Server
                               (init [_])
                               (terminate [_ cause])
                               (handle-info [_ m]
                                            (reset! res m)))))]
        (! gs :hi)
        (shutdown! gs)
        (join gs)
        @res => :hi))
(fact "When handle-info throws exception, terminate is called with the cause"
      (let [gs (spawn
                 (gen-server (reify Server
                               (init [_])
                               (terminate [_ cause]
                                          (fact cause => (every-checker #(instance? Exception %) #(= (.getMessage ^Exception %) "oops!"))))
                               (handle-info [_ m]
                                            (throw (Exception. "oops!"))))))]
        (! gs :hi)
        (join gs)) => (throws Exception "oops!"))
(fact "Test receive in handle-call"
      (co.paralleluniverse.common.util.Debug/dumpAfter 5000 "foo.log")
      (let [actor (spawn #(receive
                           [from m] (! from @self (str m "!!!"))))
            gs (spawn
                 (gen-server (reify Server
                               (init [_])
                               (terminate [_ cause])
                               (handle-call [_ from id [a b]]
                                 (! actor @self (+ a b))
                                 (receive
                                   [actor m] m)))))]
        (call! gs 3 4)) => "7!!!")

gen-event

(unfinished handler1 handler2)
(fact "When notify gen-event then call blocking handlers"
      (let [ge (spawn (gen-event))]
        (add-handler! ge (fn [& _] (Fiber/sleep 10)))
        (notify! ge "hello")
        (shutdown! ge)
        (join ge)) => nil)
(fact "When notify gen-event then call handlers"
      (let [ge (spawn (gen-event
                        #(add-handler! @self handler1)))]
        (add-handler! ge handler2)
        (notify! ge "hello")
        (shutdown! ge)
        (join ge)) => nil
      (provided 
        (handler1 "hello") => nil
        (handler2 "hello") => nil))
(fact "When handler is removed then don't call it"
      (let [ge (spawn (gen-event
                        #(add-handler! @self handler1)))]
        (add-handler! ge handler2)
        (Strand/sleep 50)
        (remove-handler! ge handler1)
        (notify! ge "hello")
        (shutdown! ge)
        (join ge)) => nil
      (provided 
        (handler1 anything) => irrelevant :times 0
        (handler2 "hello") => nil))

gen-fsm

(unfinished handler1 handler2)
(fact "test gen-fsm"
      (let [x (atom 0)]
        (letfn [(state1 []
                        (receive
                          :a (do
                               (swap! x inc)
                               state2)))
                (state2 []
                        (receive
                          :b (do
                               (swap! x inc)
                               :done)))]
          (let [gfsm (spawn (gen-fsm state1))]
            (! gfsm :b)
            (! gfsm :qqq)  ; ignore
            (! gfsm :a)
            (join gfsm)
            @x))) => 2)

supervisor

This is cumbersome, but we don't normally obtain an actor from a supervisor, but rather manually add a known actor to a supervisor, so this ugly function is only useful for tests anyway.

(defn- sup-child
  [sup id timeout]
  (when (pos? timeout)
    (let [a (get-child sup id)]
      (if (and a (not (done? a)))
        a
        (do
          (Thread/sleep 10)
          (recur sup id (- timeout 10)))))))
(defsfn actor1 []
  (loop [i (int 0)]
    (receive
      [:shutdown a] i
      :else (recur (inc i)))))
(defsfn bad-actor1 []
  (receive)
  (throw (RuntimeException. "Ha!")))
(fact "child-modes"
      (fact "When permanent actor dies of natural causes then restart"
            (let [sup (spawn
                        (supervisor :one-for-one
                                    #(list ["actor1" :permanent 5 1 :sec 10 actor1])))]
              (doseq [res [3 5]]
                (let [a (sup-child sup "actor1" 200)]
                  (dotimes [i res]
                    (! a :hi!))
                  (! a :shutdown nil)
                  (fact
                    (join a) => res)))
              (shutdown! sup)
              (join sup)))
      (fact "When permanent actor dies of un-natural causes then restart"
            (let [sup (spawn
                        (supervisor :one-for-one
                                    #(list ["actor1" :permanent 5 1 :sec 10 bad-actor1])))]
              (dotimes [i 2]
                (let [a (sup-child sup "actor1" 300)]
                  (! a :hi!)
                  (fact
                    (join a) => throws Exception)))
              (shutdown! sup)
              (join sup)))
      (fact "When transient actor dies of natural causes then don't restart"
            (let [sup (spawn
                        (supervisor :one-for-one
                                    #(list ["actor1" :transient 5 1 :sec 10 actor1])))]
              (let [a (sup-child sup "actor1" 200)]
                (dotimes [i 3]
                  (! a :hi!))
                (! a :shutdown nil)
                (fact
                  (join a) => 3))
              (fact (sup-child sup "actor1" 200) => nil)
              (shutdown! sup)
              (join sup)))
      (fact "When transient actor dies of un-natural causes then restart"
            (let [sup (spawn
                        (supervisor :one-for-one
                                    #(list ["actor1" :transient 5 1 :sec 10 bad-actor1])))]
              (dotimes [i 2]
                (let [a (sup-child sup "actor1" 200)]
                  (! a :hi!)
                  (fact
                    (join a) => throws Exception)))
              (shutdown! sup)
              (join sup)))
      (fact "When temporary actor dies of natural causes then don't restart"
            (let [sup (spawn
                        (supervisor :one-for-one
                                    #(list ["actor1" :temporary 5 1 :sec 10 actor1])))]
              (let [a (sup-child sup "actor1" 200)]
                (dotimes [i 3]
                  (! a :hi!))
                (! a :shutdown nil)
                (fact
                  (join a) => 3))
              (fact (sup-child sup "actor1" 200) => nil)
              (shutdown! sup)
              (join sup)))
      (fact "When temporary actor dies of un-natural causes then don't restart"
            (let [sup (spawn
                        (supervisor :one-for-one
                                    #(list ["actor1" :temporary 5 1 :sec 10 bad-actor1])))]
              (let [a (sup-child sup "actor1" 200)]
                (! a :hi!)
                (fact
                  (join a) => throws Exception))
              (fact (sup-child sup "actor1" 200) => nil)
              (shutdown! sup)
              (join sup))))
(fact "When a child dies too many times then give up and die"
      (let [sup (spawn
                  (supervisor :one-for-one
                              #(list ["actor1" :permanent 3 1 :sec 10 bad-actor1])))]
        (dotimes [i 4]
          (let [a (sup-child sup "actor1" 200)]
            (! a :hi!)
            (fact
              (join a) => throws Exception)))
        (join sup)))
(defsfn actor3
  [sup started terminated]
  (let [adder
        (add-child! sup nil :temporary 5 1 :sec 10
                   (gen-server ; or (spawn (gen-server...))
                               (reify Server
                                 (init        [_]       (swap! started inc))
                                 (terminate   [_ cause] (swap! terminated inc))
                                 (handle-call [_ from id [a b]]
                                              (log :debug "=== a: {} b: {}" a b)
                                              (let [res (+ a b)]
                                                (if (> res 100)
                                                  (throw (Exception. "oops!"))
                                                  res))))))
        a (receive)
        b (receive)]
    (call! adder a b)))
(fact :complex "Complex example test1"
      (let [prev       (atom nil)
            started    (atom 0)
            terminated (atom 0)
            sup (spawn
                  (supervisor :all-for-one
                              #(list ["actor1" :permanent 5 1 :sec 10
                                      :name "koko" ; ... and any other optional parameter accepted by spawn
                                      actor3 @self started terminated])))]
        (let [a (sup-child sup "actor1" 200)]
          (! a 3)
          (! a 4)
          (fact :complex
            (join a) => 7)
          (reset! prev a))
        (let [a (sup-child sup "actor1" 200)]
          (fact :complex
            (identical? a @prev) => false)
          (! a 70)
          (! a 80)
          (fact :complex
            (join a) => throws Exception)
          (reset! prev a))
        (let [a (sup-child sup "actor1" 200)]
          (fact (identical? a @prev) => false)
          (! a 7)
          (! a 8)
          (fact :complex
            (join a) => 15))
        (Strand/sleep 2000) ; give the actor time to start the gen-server
        (shutdown! sup)
        (join sup)
        [@started @terminated]) => [4 4])
 
(ns co.paralleluniverse.pulsar.auto-instrumentation-test
  (:use midje.sweet
        co.paralleluniverse.pulsar.core)
  (:refer-clojure :exclude [promise await bean])
  (:import [co.paralleluniverse.fibers Fiber]
           (java.io InputStream)))
(defn stcktrc [] #_(.printStackTrace (Exception.) (System/err)))
(defn dbug [s] #_(.println System/err s))
(defprotocol P1
  (foo ^Integer [this]))
(defprotocol P2
  (bar-me ^Integer [this ^Integer y]))
(deftype T [^Integer a ^Integer b ^Integer c]
  P1
  (foo [_] (dbug "T.foo before sleep") (stcktrc) (Fiber/sleep 100) (dbug "T.foo after sleep") a))
(extend-type T
  P2
  (bar-me [this y] (dbug "T.bar-me before sleep") (stcktrc) (Fiber/sleep 100) (dbug "T.bar-me after sleep") (+ (.c this) y)))
(defrecord R [^Integer a ^Integer b ^Integer c]
  P1
  (foo [_] (dbug "R.foo before sleep") (stcktrc) (Fiber/sleep 100) (dbug "R.foo after sleep") b))
(extend-type R
  P2
  (bar-me [this y] (dbug "R.bar-me before sleep") (stcktrc) (Fiber/sleep 100) (dbug "R.bar-me after sleep") (* (:c this) y)))
(def a
  (reify
    P1
    (foo [_] (dbug "a.foo before sleep") (stcktrc) (Fiber/sleep 100) (dbug "a.foo after sleep") 17)
    P2
    (bar-me [_ y] (dbug "a.bar-me before sleep") (stcktrc) (Fiber/sleep 100) (dbug "a.bar-me after sleep") y)))
(def px (proxy [InputStream] [] (read [] (dbug "px.read before sleep") (stcktrc) (Fiber/sleep 100) (dbug "px.read after sleep") -1)))
(def t (->T 1 2 3))
(def r (->R 3 2 1))
(defmulti area :Shape)
(defn rect [wd ht] {:Shape :Rect :wd wd :ht ht})
(defn circle [radius] {:Shape :Circle :radius radius})
(defmethod area :Rect [r]
  (dbug "area :Rect before sleep") (stcktrc) (Fiber/sleep 100) (dbug "area :Rect after sleep") (* (:wd r) (:ht r)))
(defmethod area :Circle [c]
  (dbug "area :Circle before sleep") (stcktrc) (Fiber/sleep 100) (dbug "area :Circle after sleep") (* (. Math PI) (* (:radius c) (:radius c))))
(defmethod area :default [x] :oops)
(def rect (rect 4 13))
(def circle (circle 12))
(defn simple-fun [] (dbug "simple-fun before sleep") (stcktrc) (Fiber/sleep 100) (dbug "simple-fun after sleep") 17)
(defn res []
  (join
    (spawn-fiber
      (fn []
        [(foo t)
         (bar-me t 5)
         (foo r)
         (bar-me r 56)
         (foo a)
         (bar-me a 45)
         (.read px)
         (area rect)
         (area circle)
         (simple-fun)]))))
(if (= (System/getProperty "co.paralleluniverse.pulsar.instrument.auto") "all")
  (do
    (fact "reduce-test"
          (let [action-fn #(reduce (sfn [acc v]
                                        (conj acc (join (fiber (inc v)))))
                                   [] (range 20))]
            (join (fiber (action-fn)))) => [1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20])
    (fact "Clojure language features work with auto-instrumentation" :auto-instrumentation
          (res) => [1 8 2 56 17 45 -1 52 452.3893421169302 17])))
 
(ns co.paralleluniverse.pulsar.core-test
  (:use midje.sweet
        co.paralleluniverse.pulsar.core)
  (:require [midje.checking.core :as checking])
  (:refer-clojure :exclude [promise await])
  (:import [java.util.concurrent TimeUnit TimeoutException ExecutionException]
           [co.paralleluniverse.common.util Debug]
           [co.paralleluniverse.strands Strand]
           [co.paralleluniverse.fibers Fiber]))

fibers

(fact "fiber-timeout"
      (fact "When join and timeout then throw exception"
            (let [fib (spawn-fiber #(Fiber/park 100 TimeUnit/MILLISECONDS))]
              (join 2 :ms fib))
            => (throws TimeoutException))
      (fact "When join and no timeout then join"
            (let [fib (spawn-fiber
                        #(do
                           (Fiber/park 100 TimeUnit/MILLISECONDS)
                           15))]
              (join 200 :ms fib))
            => 15))
(fact "When fiber throws exception then join throws that exception"
      (let [fib (spawn-fiber #(throw (Exception. "my exception")))]
        (join fib))
      => (throws Exception "my exception"))
(fact "simple test"
      (let [f (fn [a b] (Fiber/sleep 20) (+ a b))
            fib (spawn-fiber f 3 4)]
        (join fib))
      => 7)
(fact "When fiber interrupted while sleeping then InterruptedException thrown"
      (let [fib (spawn-fiber
                  #(try
                     (sleep 100)
                     false
                     (catch InterruptedException e
                       true)))]
        (sleep 20)
        (.interrupt fib)
        (join fib)) => true)
(def ^:dynamic *foo* 40)
(facts "fiber-bindings"
       (fact "Fiber inherits thread bindings"
             (let [fiber
                   (binding [*foo* 20]
                     (spawn-fiber
                       #(let [v1 *foo*]
                          (sleep 200)
                          (let [v2 *foo*]
                            (+ v1 v2)))))]
               (join fiber))
             => 40)
       (fact "Bindings declared in fiber last throughout fiber lifetime"
             (let [fiber
                   (spawn-fiber
                     #(binding [*foo* 15]
                        (let [v1 *foo*]
                          (sleep 200)
                          (let [v2 *foo*]
                            (+ v1 v2)))))]
               (join fiber))
             => 30))
(fact "fiber->future can be used to turn a fiber into a future"
      (let [fiber (spawn-fiber
                    (fn []
                      (sleep 20)
                      42))
            fut (fiber->future fiber)]
        (fact @fut => 42)
        (fact (realized? fut) => true)))
(fact "fiber can be used to execute a whole block in a newly created fiber"
      (let [fiber (fiber
                    (sleep 20)
                    0)]
        (fact (join fiber) => 0)))
(fact "await blocks the fiber and returns the value passed to the callback"
      (let [exec (java.util.concurrent.Executors/newSingleThreadExecutor)
            service (fn [a b clbk]
                      (.execute exec ^Runnable (fn []
                                                 (sleep 50)
                                                 (clbk (+ a b)))))
            fiber (spawn-fiber
                    (fn []
                      (await service 2 5)))]
        (join fiber) => 7))

channels

(fact "Test channel close"
      (let [ch (channel)
            fiber (spawn-fiber
                    (fn []
                      (let [m1 (rcv ch)
                            m2 (rcv ch)
                            m3 (rcv ch)
                            m4 (rcv ch)]
                        (fact
                          (closed? ch) => true)
                        (list m1 m2 m3 m4))))]
        (sleep 20)
        (snd ch "m1")
        (sleep 20)
        (snd ch "m2")
        (sleep 20)
        (snd ch "m3")
        (close! ch)
        (snd ch "m4")
        (join fiber))  => '("m1" "m2" "m3" nil))
(fact "Test multi-consumer channels"
      (let [ch (channel 0 :block true true)
            f #(rcv ch)
            fiber1 (spawn-fiber f)
            fiber2 (spawn-fiber f)]
        (snd ch "m1")
        (snd ch "m2")
        (close! ch)
        #{(join fiber1) (join fiber2)}  => #{"m1" "m2"}))
(fact "Test snd-seq and rcv-into"
      (let [ch (channel)
            fiber (spawn-fiber #(rcv-into [] ch 1000))]
        (snd-seq ch (range 5))
        (close! ch)
        (join fiber)) => [0 1 2 3 4])
(facts "promises-promises"
       (fact "When try to set promise twice, then return nil and the first value wins"
             (let [v (promise)]
               [(deliver v "hi!")
                (deliver v "bye!")
                (deref v)] => [v nil "hi!"]))
       (fact "This complex promises test passes"
             (let [v1 (promise)
                   v2 (promise)
                   v3 (promise)
                   v4 (promise)
                   f1 (spawn-fiber  #(deliver v2 (+ @v1 1)))
                   t1 (spawn-thread #(deliver v3 (+ @v1 @v2)))
                   f2 (spawn-fiber  #(deliver v4 (+ @v3 @v2)))]
               (sleep 50)
               (deliver v1 1)
               (join [f1 f2 t1])
               (fact
                 (mapv realized? [v1 v2 v3 v4]) => [true true true true])
               @v4) => 5)
       (fact "Test promise functions"
             (let [v0 (promise)
                   v1 (promise)
                   v2 (promise #(+ @v1 1))
                   v3 (promise #(+ @v1 @v2))
                   v4 (promise #(* (+ @v3 @v2) @v0))]
               (sleep 50)
               (deliver v1 1)
               (fact @v3 => 3)
               (fact
                 (mapv realized? [v0 v1 v2 v3 v4]) => [false true true true false])
               (deliver v0 2)
               @v4) => 10))
(facts "topics"
       (fact "When channel subscribes to topic then it receives its messages"
             (let [ch1 (channel)
                   ch2 (channel)
                   ch3 (channel)
                   topic (topic)
                   f1 (spawn-fiber #(list (rcv ch1) (rcv ch1) (rcv ch1)))
                   f2 (spawn-fiber #(list (rcv ch2) (rcv ch2) (rcv ch2)))
                   f3 (spawn-fiber #(list (rcv ch3) (rcv ch3) (rcv ch3)))]
               (subscribe! topic ch1)
               (subscribe! topic ch2)
               (subscribe! topic ch3)
               (sleep 20)
               (snd topic "m1")
               (sleep 20)
               (snd topic "m2")
               (sleep 20)
               (snd topic "m3")
               (fact 
                 (join f1) => '("m1" "m2" "m3"))
               (fact 
                 (join f2) => '("m1" "m2" "m3"))
               (fact 
                 (join f3) => '("m1" "m2" "m3"))))
       (fact "When channel unsubscribes from topic then it stops receiving messages"
             (let [ch1 (channel)
                   ch2 (channel)
                   ch3 (channel)
                   topic (topic)
                   f1 (spawn-fiber #(list (rcv ch1 1000 :ms) (rcv ch1 1000 :ms) (rcv ch1 1000 :ms)))
                   f2 (spawn-fiber #(list (rcv ch2 1000 :ms) (rcv ch2 1000 :ms) (rcv ch2 1000 :ms)))
                   f3 (spawn-fiber #(list (rcv ch3 1000 :ms) (rcv ch3 1000 :ms) (rcv ch3 1000 :ms)))]
               (subscribe! topic ch1)
               (subscribe! topic ch2)
               (subscribe! topic ch3)
               ;(sleep 5)
               (snd topic "m1")
               (sleep 5)
               (snd topic "m2")
               (unsubscribe! topic ch2)
               (sleep 5)
               (snd topic "m3")
               (fact 
                 (join f1) => '("m1" "m2" "m3"))
               (fact 
                 (join f2) => '("m1" "m2" nil))
               (fact 
                 (join f3) => '("m1" "m2" "m3")))))
(defn gt? [y]
  #(or (> % y)  (checking/as-data-laden-falsehood {:notes [(str "actual " % " required > " y)]})))
(fact "When multiple consumers receive from ticker-channel then each consumer's messages are monotonic"
      (let [ch (channel 10 :displace)
            task (sfn [] 
                        (let [ch (ticker-consumer ch)]
                          (loop [prev -1]
                            (let [m (rcv ch)]
                              (when m
                                (assert (> m prev)) ;(fact m => (gt? prev))
                                (recur (long m)))))))
            f1 (spawn-fiber task)
            t1 (spawn-thread task)
            f2 (spawn-fiber task)
            t2 (spawn-thread task)
            f3 (spawn-fiber task)
            t3 (spawn-thread task)
            f4 (spawn-fiber task)
            t4 (spawn-thread task)]
        (sleep 100)
        (dotimes [i 1000]
          (sleep 1)
          (snd ch i))
        (close! ch)
        (join (list f1 t1 f2 t2 f3 t3 f4 t4))
        :ok) => :ok)
(defn fan-in [ins size]
  (let [c (channel size)]
    (spawn-fiber #(while true
                    (let [[x] (sel ins)]
                      (snd c x))))
    c))
(defn fan-out [in cs-or-n]
  (let [cs (if (number? cs-or-n)
             (doall (repeatedly cs-or-n channel))
             (doall cs-or-n))]
    (spawn-fiber (fn []
                   (while true
                     (let [x (rcv in)
                           outs (map #(vector % x) cs)]
                       (sel outs)))))
    cs))
(facts :selected "select"
       #_(Debug/dumpAfter 5000 "sel.log")
       (fact "basic sel test"
             (let [cout (channel 0) ;;
                   cin (fan-in (fan-out cout (repeatedly 3 channel)) 0)
                   f (spawn-fiber #(loop [n (int 0)
                                          res []]
                                     (if (< n 10)
                                        (do 
                                          (snd cout n)
                                          (recur (inc n) (conj res (rcv cin))))
                                        res)))]
               (join f) => (vec (range 10))))
       (fact :selected "another sel test"
             (let [n 20
                   cout (channel 1) ;;
                   cin (fan-in (fan-out cout (repeatedly n #(channel 1))) 1)]
               (dotimes [i n]
                 (snd cout i))
               (sort (repeatedly n #(rcv cin))) => (range n)))
       (fact "test select with timeout"
             (let [c (channel)]
               (select :timeout 0 
                       c ([v] v) 
                       :else "timeout!")) => "timeout!")
       (fact "test select with timeout2"
             (let [c (channel)]
               (select :timeout 100 
                       c ([v] v) 
                       :else "timeout!")) => "timeout!")
       (fact "test select with timeout3"
             (let [c (channel)
                   f (spawn-fiber #(snd c 10))]
               (select :timeout 100 
                       c ([v] (inc v)) 
                       :else "timeout!")) => 11))
(declare skynet)
(defsfn subnet [num size div]
  (loop [i 0
         children []]
    (if (= i div)
      children
      (recur
        (+ i 1)
        (conj children (fiber (skynet (int (+ num (* i (/ size div))))
                                      (int (/ size div))
                                      div)))))))
(defsfn skynet [num size div]
  (if (= size 1)
    num
    (->> (subnet num size div)
         doall
         join ;;comment if not fiber
         (reduce +))))
(fact "skynet"
  (let [res (skynet 0 100 10)]
    res) => 4950)
 
(ns co.paralleluniverse.pulsar.dataflow-test
  (:refer-clojure :exclude [promise await])
  (:use midje.sweet
        co.paralleluniverse.pulsar.core)
  (:require [co.paralleluniverse.pulsar.dataflow :refer :all]
            [midje.checking.core :as checking]))
(facts "vals"
       (fact "When try to set val twice, then throw exception"
             (let [v (df-val)]
               (v "hi!")
               (fact (v "bye!") => (throws IllegalStateException))
               @v => "hi!"))
       (fact "This complex val test passes"
             (let [v1 (df-val)
                   v2 (df-val)
                   v3 (df-val)
                   v4 (df-val)
                   f1 (spawn-fiber  #(v2 (+ @v1 1)))
                   t1 (spawn-thread #(v3 (+ @v1 @v2)))
                   f2 (spawn-fiber  #(v4 (+ @v3 @v2)))]
               (sleep 50)
               (v1 1)
               (join [f1 f2 t1])
               (fact
                 (mapv realized? [v1 v2 v3 v4]) => [true true true true])
               @v4) => 5)
       (fact "Test val functions"
             (let [v0 (df-val)
                   v1 (df-val)
                   v2 (df-val #(+ @v1 1))
                   v3 (df-val #(+ @v1 @v2))
                   v4 (df-val #(* (+ @v3 @v2) @v0))]
               (sleep 50)
               (v1 1)
               (fact @v3 => 3)
               (fact
                 (mapv realized? [v0 v1 v2 v3 v4]) => [false true true true false])
               (v0 2)
               @v4) => 10))
(facts "vars"
       (fact "Block until var is set for the first time"
             (let [x (df-var)

                   f (fiber
                       (sleep 50)
                       (x 45))]
               @x) => 45)
       (fact "When no history, return last"
             (let [x (df-var)]
               (x 1)
               (x 2)
               (x 3)
               @x) => 3)
       (fact "When history, return history"
             (let [x (df-var 10)]
               (x 1)
               (x 2)
               (x 3)
               (fact @x => 1)
               (fact @x => 2)
               (fact @x => 3))))
(fact "vals and vars"
      (let [res (atom [])
            a (df-val)
            x (df-var)
            y (df-var #(* @a @x))
            z (df-var #(+ @a @x))
            r (df-var #(let [v (+ @a @y @z)]
                        (swap! res conj v)))
            f (fiber
                (loop [i 0]
                  (when (< i 5)
                    (sleep 50)
                    (x i)
                    (recur (inc i)))))]
        (sleep 10)
        (a 3)
        (join f)
        (sleep 100)
        (last @r)) => 22)
 
(ns co.paralleluniverse.pulsar.lazyseq-test
  (:use midje.sweet
        co.paralleluniverse.pulsar.core)
  (:require [co.paralleluniverse.pulsar.lazyseq :as s :refer [channel->lazy-seq]]
            [midje.checking.core :as checking])
  (:refer-clojure :exclude [promise await])
  (:import [java.util.concurrent TimeUnit TimeoutException ExecutionException]
           [co.paralleluniverse.common.util Debug]
           [co.paralleluniverse.strands Strand]
           [co.paralleluniverse.fibers Fiber]))

(fact "Receive sequence with sleep" (let [ch (channel -1) fiber (spawn-fiber #(let [s (take 5 (channel->lazy-seq ch))] (doall s)))] (dotimes [m 10] (sleep 200) (snd ch m)) (join fiber)) => '(0 1 2 3 4))

(fact "Map received sequence with sleep" (let [ch (channel -1) fiber (spawn-fiber (fn [] (doall (map #(* % %) (take 5 (channel->lazy-seq ch))))))] (dotimes [m 10] (sleep 200) (snd ch m)) (join fiber)) => '(0 1 4 9 16))

(fact "Filter received sequence with sleep (odd)" (let [ch (channel -1) fiber (spawn-fiber #(doall (filter odd? (take 5 (channel->lazy-seq ch)))))] (dotimes [m 10] (sleep 200) (snd ch m)) (let [res (join fiber)] (co.paralleluniverse.common.util.Debug/dumpRecorder) res) => '(1 3)))

_(fact "Filter received sequence with sleep (even)"

 (let [ch (channel -1)
       fiber (spawn-fiber
               #(doall (filter even? (take 5 (channel->lazy-seq ch)))))]
   (dotimes [m 10]
     (sleep 200)
     (snd ch m))
   (join fiber)) => '(0 2 4))

_(fact "Filter and map received sequence with sleep (even)"

 (let [ch (channel -1)
       fiber (spawn-fiber
               (fn [] (doall
                        (filter #(> % 10)
                                  (map #(* % %)
                                         (filter even?
                                                   (take 5 (channel->lazy-seq ch))))))))]
   (dotimes [m 10]
     (sleep 200)
     (snd ch m))
   (join fiber)) => '(16))

(fact "Send and receive sequence" (let [ch (channel -1) fiber (spawn-fiber #(doall (take 5 (channel->lazy-seq ch))))] (snd-seq ch (take 10 (range))) (join fiber)) => '(0 1 2 3 4))

(fact "Map received sequence" (let [ch (channel -1) fiber (spawn-fiber (fn [] (doall (map #(* % %) (take 5 (channel->lazy-seq ch))))))] (snd-seq ch (take 10 (range))) (join fiber)) => '(0 1 4 9 16))

(fact "Filter received sequence" (let [ch (channel -1) fiber (spawn-fiber #(doall (filter even? (take 5 (channel->lazy-seq ch)))))] (snd-seq ch (take 10 (range))) (join fiber)) => '(0 2 4))

(fact "Filter huge sequence 1" (filter #(= % 1234567) (map inc (range 10000000))) => '(1234567))

(fact "Filter huge sequence2" (let [ch (channel 0) p (spawn-fiber (fn [] (snd-seq ch (map inc (range 10000000))) (close! ch))) c (spawn-fiber (fn [] (doall (filter #(= % 1234567) (channel->lazy-seq ch)))))] (join c)) => '(12345678))

 
(ns co.paralleluniverse.pulsar.rx-test
  (:use midje.sweet
        co.paralleluniverse.pulsar.core)
  (:refer-clojure :exclude [promise await])
  (:require [co.paralleluniverse.pulsar.rx :as rx]
            [midje.checking.core :as checking]))
(facts "test map and filter"
       (fact "test filter"
             (let [ch (channel)
                   fiber (spawn-fiber
                           (fn []
                             (let [ch1 (rx/filter even? ch)
                                   m1 (rcv ch1)
                                   m2 (rcv ch1)
                                   m3 (rcv ch1)]
                               (list m1 m2 m3))))]
               (sleep 20)
               (snd ch 1)
               (snd ch 2)
               (sleep 20)
               (snd ch 3)
               (snd ch 4)
               (sleep 20)
               (snd ch 5)
               (close! ch)
               (join fiber))  => '(2 4 nil))
       (fact "test map"
             (let [ch (channel)
                   fiber (spawn-fiber
                           (fn []
                             (let [ch1 (rx/map #(+ 10 %) ch)
                                   m1 (rcv ch1)
                                   m2 (rcv ch1)
                                   m3 (rcv ch1)
                                   m4 (rcv ch1)]
                               (list m1 m2 m3 m4))))]
               (sleep 20)
               (snd ch 1)
               (snd ch 2)
               (sleep 20)
               (snd ch 3)
               (close! ch)
               (join fiber))  => '(11 12 13 nil))
       (fact "test filter then map"
             (let [ch (channel)
                   fiber (spawn-fiber
                           (fn []
                             (let [ch1 (rx/map #(+ 10 %) (rx/filter even? ch))
                                   m1 (rcv ch1)
                                   m2 (rcv ch1)
                                   m3 (rcv ch1)]
                               (list m1 m2 m3))))]
               (sleep 20)
               (snd ch 1)
               (snd ch 2)
               (sleep 20)
               (snd ch 3)
               (snd ch 4)
               (sleep 20)
               (snd ch 5)
               (close! ch)
               (join fiber))  => '(12 14 nil)))
(facts "test snd- map and filter"
       (fact "test filter"
             (let [ch (channel)
                   fiber (spawn-fiber
                           (fn []
                             (let [m1 (rcv ch)
                                   m2 (rcv ch)
                                   m3 (rcv ch)]
                               (list m1 m2 m3))))
                   ch1 (rx/snd-filter even? ch)]
               (sleep 20)
               (snd ch1 1)
               (snd ch1 2)
               (sleep 20)
               (snd ch1 3)
               (snd ch1 4)
               (sleep 20)
               (snd ch1 5)
               (close! ch1)
               (join fiber))  => '(2 4 nil))
       (fact "test map"
             (let [ch (channel)
                   fiber (spawn-fiber
                           (fn []
                             (let [m1 (rcv ch)
                                   m2 (rcv ch)
                                   m3 (rcv ch)
                                   m4 (rcv ch)]
                               (list m1 m2 m3 m4))))
                   ch1 (rx/snd-map #(+ 10 %) ch)]
               (sleep 20)
               (snd ch1 1)
               (snd ch1 2)
               (sleep 20)
               (snd ch1 3)
               (close! ch1)
               (join fiber))  => '(11 12 13 nil))
       (fact "test filter then map"
             (let [ch (channel)
                   fiber (spawn-fiber
                           (fn []
                             (let [m1 (rcv ch)
                                   m2 (rcv ch)
                                   m3 (rcv ch)]
                               (list m1 m2 m3))))
                   ch1 (rx/snd-map #(+ 10 %) (rx/snd-filter even? ch))]
               (sleep 20)
               (snd ch1 1)
               (snd ch1 2)
               (sleep 20)
               (snd ch1 3)
               (snd ch1 4)
               (sleep 20)
               (snd ch1 5)
               (close! ch1)
               (join fiber))  => '(12 14 nil)))
(facts "test group"
       (fact "Receive from channel group"
             (let [ch1 (channel)
                   ch2 (channel)
                   ch3 (channel)
                   grp (rx/group ch1 ch2 ch3)
                   fiber (spawn-fiber
                           (fn []
                             (let [m1 (rcv grp)
                                   m2 (rcv ch2)
                                   m3 (rcv grp)]
                               (list m1 m2 m3))))]

               (sleep 20)
               (snd ch1 "hello")
               (sleep 20)
               (snd ch2 "world!")
               (sleep 20)
               (snd ch3 "foo")
               (join fiber))  => '("hello" "world!" "foo"))
       (fact "Receive from channel group with timeout"
             (let [ch1 (channel)
                   ch2 (channel)
                   ch3 (channel)
                   grp (rx/group ch1 ch2 ch3)
                   fiber (spawn-fiber
                           (fn []
                             (let [m1 (rcv grp)
                                   m2 (rcv grp 10 :ms)
                                   m3 (rcv grp 100 :ms)]
                               (list m1 m2 m3))))]
               (sleep 20)
               (snd ch1 "hello")
               (sleep 100)
               (snd ch3 "world!")
               (join fiber))  => '("hello" nil "world!")))
(fact "test zip"
      (let [ch1 (channel 10)
            ch2 (channel 10)
            fiber (spawn-fiber
                    (fn []
                      (let [ch (rx/zip ch1 ch2)
                            m1 (rcv ch)
                            m2 (rcv ch)
                            m3 (rcv ch)]
                        (list m1 m2 m3))))]
        (sleep 20)
        (snd ch1 "a")
        (snd ch1 "b")
        (sleep 20)
        (snd ch2 1)
        (sleep 20)
        (snd ch2 2)
        (snd ch2 3)
        (sleep 20)
        (snd ch1 "c")
        (close! ch1)
        (close! ch2)
        (join fiber))  => '(["a" 1] ["b" 2] ["c" 3]))
(fact "test mapcat"
      (let [ch (channel 10)
            fiber (spawn-fiber
                    (fn []
                      (let
                          [ch1 (rx/mapcat (fn [x]
                                            (cond
                                              (= x 3) nil
                                              (even? x) [(* 10 x) (* 100 x) (* 1000 x)]
                                              :else x))
                                          ch)
                           m1 (rcv ch1)
                           m2 (rcv ch1)
                           m3 (rcv ch1)
                           m4 (rcv ch1)
                           m5 (rcv ch1)
                           m6 (rcv ch1)
                           m7 (rcv ch1)
                           m8 (rcv ch1)
                           m9 (rcv ch1)]
                        (list m1 m2 m3 m4 m5 m6 m7 m8 m9))))]
        (sleep 20)
        (snd ch 1)
        (snd ch 2)
        (sleep 20)
        (snd ch 3)
        (snd ch 4)
        (snd ch 5)
        (close! ch)
        (join fiber))  => '(1 20 200 2000 40 400 4000 5 nil))
(fact "test snd-mapcat"
      (let [ch1 (channel 10)
            fiber (spawn-fiber
                    (fn []
                      (let
                          [
                           m1 (rcv ch1)
                           m2 (rcv ch1)
                           m3 (rcv ch1)
                           m4 (rcv ch1)
                           m5 (rcv ch1)
                           m6 (rcv ch1)
                           m7 (rcv ch1)
                           m8 (rcv ch1)
                           m9 (rcv ch1)]
                        (list m1 m2 m3 m4 m5 m6 m7 m8 m9))))
            ch (rx/snd-mapcat
                 (fn [x]
                   (cond
                     (= x 3) nil
                     (even? x) [(* 10 x) (* 100 x) (* 1000 x)]
                     :else x))
                 ch1
                 (channel 1))]
        (sleep 20)
        (snd ch 1)
        (snd ch 2)
        (sleep 20)
        (snd ch 3)
        (snd ch 4)
        (snd ch 5)
        (close! ch)
        (join fiber))  => '(1 20 200 2000 40 400 4000 5 nil))
(fact "test fiber-transform"
      (let [in (channel)
            out (channel)
            fiber (spawn-fiber
                    (fn []
                      (list (rcv out) (rcv out) (rcv out) (rcv out))))]
        (rx/fiber-transform in out (fn [in out]
                                     (if-let [x (rcv in)]
                                       (do
                                         (when (zero? (mod x 2))
                                           (snd out (* x 10)))
                                         (recur in out))
                                       (do (snd out 1234)
                                           (close! out)))))
        (sleep 20)
        (snd in 1)
        (snd in 2)
        (sleep 20)
        (snd in 3)
        (snd in 4)
        (sleep 20)
        (snd in 5)
        (close! in)
        (join fiber))  => '(20 40 1234 nil))
 
(ns co.paralleluniverse.pulsar.async.api_test
  (:use midje.sweet)
  (:refer-clojure :exclude [map into reduce merge take partition partition-by])
  (:require [co.paralleluniverse.pulsar.core :as p])
  (:require [co.paralleluniverse.pulsar.async :refer :all :as a])
  (:import (co.paralleluniverse.strands Strand)))
(defn default-chan []
  (chan 1))
(fact "Buffers"
      (fact "(buffer 1) not unblocking"
            (unblocking-buffer? (buffer 1)) => false)
      (fact "(dropping-buffer 1) unblocking"
            (unblocking-buffer? (dropping-buffer 1)) => true)
      (fact "(sliding-buffer 1) unblocking"
            (unblocking-buffer? (sliding-buffer 1)) => true))
(fact "Basic channel behavior"
      (let [c (default-chan)
            f (future (<!! c))]
      (>!! c 42)
      @f => 42))
(def DEREF_WAIT 20)
(fact "Writes block on full blocking buffer"
  (let [c (default-chan)
        _ (>!! c 42)
        blocking (deref (future (>!! c 43)) DEREF_WAIT :blocked)]
    blocking => :blocked))
(fact "Unfulfilled readers block"
      (let [c (default-chan)
            r1 (future (<!! c))
            r2 (future (<!! c))
            _ (>!! c 42)
            r1v (deref r1 DEREF_WAIT :blocked)
            r2v (deref r2 DEREF_WAIT :blocked)]
        (and (or (= r1v :blocked) (= r2v :blocked))
             (or (= 42 r1v) (= 42 r2v))) => true))
(fact "<!! and put!"
      (let [executed (p/promise)
            test-channel (chan nil)]
        (put! test-channel :test-val (fn [_] (deliver executed true)))
        (fact "The provided callback does not execute until a reader can consume the written value."
              (not (realized? executed)) => true)
        (fact "The written value is provided over the channel when a reader arrives."
              (<!! test-channel) => :test-val)
        (fact "The provided callback executes once the reader has arrived."
              @executed => true)))
(fact "!! and take!"
  (fact "The written value is the value provided to the read callback."
        (let [read-promise (p/promise)
              test-channel (chan nil)]
           (take! test-channel #(deliver read-promise %))
           (fact "The read waits until a writer provides a value."
                 (realized? read-promise) => false)
           (>!! test-channel :test-val)
           (deref read-promise 1000 false)) => :test-val))
(fact "take! on-caller?"
      (fact "When on-caller? requested, but no value is immediately available, take!'s callback executes on another strand."
            (apply not= (let [starting-strand (Strand/currentStrand)
                              test-channel (chan nil)
                              read-promise (p/promise)]
                          (take! test-channel (fn [_] (deliver read-promise (Strand/currentStrand))) true)
                          (>!! test-channel :foo)
                          [starting-strand @read-promise]))
            => true)
      (fact "When on-caller? requested, and a value is ready to read, take!'s callback executes on the same strand."
            (apply = (let [starting-strand (Strand/currentStrand)
                           test-channel (chan nil)
                           read-promise (p/promise)]
                       (put! test-channel :foo (constantly nil))
                       (Strand/sleep 100) ; Make (almost) sure the reading fiber is blocked on the channel before put!
                       (take! test-channel (fn [_] (deliver read-promise (Strand/currentStrand))) true)
                       [starting-strand @read-promise]))
            => true)
      (fact "When on-caller? is false, and a value is ready to read, take!'s callback executes on a different strand."
            (apply not= (let [starting-strand (Strand/currentStrand)
                              test-channel (chan nil)
                              read-promise (p/promise)]
                          (put! test-channel :foo (constantly nil))
                          (take! test-channel (fn [_] (deliver read-promise (Strand/currentStrand))) false)
                          [starting-strand @read-promise]))
            => true))
(fact "put! on caller?"
      (fact "When on-caller? requested, and a reader can consume the value, put!'s callback executes on the same strand."
            (apply = (let [starting-strand (Strand/currentStrand)
                           test-channel (chan nil)
                           write-promise (p/promise)]
                       (take! test-channel (fn [_] nil))
                       (Strand/sleep 100) ; Make (almost) sure the reading fiber is blocked on the channel before put!
                       (put! test-channel :foo (fn [_] (deliver write-promise (Strand/currentStrand))) true)
                       [starting-strand @write-promise]))
            => true)
      (fact "When on-caller? is false, but a reader can consume the value, put!'s callback executes on a different strand."
            (apply not= (let [starting-strand (Strand/currentStrand)
                              test-channel (chan nil)
                              write-promise (p/promise)]
                          (take! test-channel (fn [_] nil))
                          (put! test-channel :foo (fn [_] (deliver write-promise (Strand/currentStrand))) false)
                          [starting-strand @write-promise]))
            => true)
     (fact "When on-caller? requested, but no reader can consume the value, put!'s callback executes on a different strand."
           (apply not= (let [starting-strand (Strand/currentStrand)
                             test-channel (chan nil)
                             write-promise (p/promise)]
                         (put! test-channel :foo (fn [_] (deliver write-promise (Strand/currentStrand))) true)
                         (take! test-channel (fn [_] nil))
                         [starting-strand @write-promise]))
           => true))
(fact "puts-fulfill-when-buffer-available"
      (= :proceeded
        (let [c (chan 1)
              p (promise)]
          (>!! c :full)  ;; fill up the channel
          (put! c :enqueues (fn [_] (deliver p :proceeded)))  ;; enqueue a put
          (<!! c)        ;; make room in the buffer
          (deref p 250 :timeout))) => true)
(def ^:dynamic test-dyn false)
(fact "thread tests"
      (binding [test-dyn true]
        (fact "bindings"
              (<!! (thread test-dyn))
              => true)))
(fact "fiber tests"
     (binding [test-dyn true]
       (fact "bindings"
             (<!! (fiber test-dyn))
             => true)))
(fact "ops tests"

      (fact "onto-chan"
            (<!! (a/into [] (a/to-chan (range 10))))
            => (range 10))

      (fact "pipe"
            (let [out (chan)]
              (pipe (a/to-chan [1 2 3 4 5])
                    out)
              (<!! (a/into [] out)))
            => [1 2 3 4 5])

      (fact "map"
            (<!! (a/into [] (a/map + [(a/to-chan (range 4))
                                      (a/to-chan (range 4))
                                      (a/to-chan (range 4))
                                      (a/to-chan (range 4))])))
            => [0 4 8 12])

      (fact "split"
            ;; Must provide buffers for channels else the tests won't complete
            (let [[even odd] (a/split even? (a/to-chan [1 2 3 4 5 6]) 5 5)]
              (fact (<!! (a/into [] even))
                    => [2 4 6])
              (fact (<!! (a/into [] odd))
                    => [1 3 5])))

      (fact "merge"
            ;; merge uses alt, so results can be in any order, we're using
            ;; frequencies as a way to make sure we get the right result.
            (frequencies (<!! (a/into [] (a/merge [(a/to-chan (range 4))
                                                   (a/to-chan (range 4))
                                                   (a/to-chan (range 4))
                                                   (a/to-chan (range 4))]))))
            => {0 4
                1 4
                2 4
                3 4})

      (fact "mix"
            (let [out (chan)
                  mx (mix out)]
              (admix mx (a/to-chan [1 2 3]))
              (admix mx (a/to-chan [4 5 6]))

              (<!! (a/into #{} (a/take 6 out))))
            => #{1 2 3 4 5 6})

      (fact "mult"
            (let [a (chan 4)
                  b (chan 4)
                  src (chan)
                  m (mult src)]
              (tap m a)
              (tap m b)
              (pipe (a/to-chan (range 4)) src)
              (fact (<!! (a/into [] a))
                    => [0 1 2 3])
              (fact (<!! (a/into [] b))
                    => [0 1 2 3])))

      (fact "pub-sub"
            (let [a-ints (chan 5)
                  a-strs (chan 5)
                  b-ints (chan 5)
                  b-strs (chan 5)
                  src (chan)
                  p (pub src (fn [x]
                               (if (string? x)
                                 :string
                                 :int)))]
              (sub p :string a-strs)
              (sub p :string b-strs)
              (sub p :int a-ints)
              (sub p :int b-ints)
              (pipe (a/to-chan [1 "a" 2 "b" 3 "c"]) src)
              (fact (<!! (a/into [] a-ints))
                    => [1 2 3])
              (fact (<!! (a/into [] b-ints))
                    => [1 2 3])
              (fact (<!! (a/into [] a-strs))
                    => ["a" "b" "c"])
              (fact (<!! (a/into [] b-strs))
                    => ["a" "b" "c"]))))
 

Verify that exceptions thrown on a thread pool managed by core.async will propagate out to the JVM's default uncaught exception handler.

(ns co.paralleluniverse.pulsar.async.exceptions-test
  (:use midje.sweet)
  (:require
    [clojure.stacktrace :refer [root-cause]]
    [co.paralleluniverse.pulsar.core :as p]
    [co.paralleluniverse.pulsar.async :refer [chan go thread put! take! <!! >!!]])
  (:import (co.paralleluniverse.fibers Fiber)
           (co.paralleluniverse.strands Strand$UncaughtExceptionHandler)))
(p/defsfn with-default-uncaught-exception-handler-fiber [handler f]
  (let [old-handler-fiber (Fiber/getDefaultUncaughtExceptionHandler)]
    (try
      (Fiber/setDefaultUncaughtExceptionHandler
        (reify Strand$UncaughtExceptionHandler
          (uncaughtException [_ strand throwable]
            (handler strand throwable))))
      (f)
      (Fiber/setDefaultUncaughtExceptionHandler old-handler-fiber)
      (catch Throwable t (.printStackTrace t) (throw t)))))
(p/defsfn with-default-uncaught-exception-handler-thread [handler f]
  (let [old-handler-thread (Thread/getDefaultUncaughtExceptionHandler)]
    (try
      (Thread/setDefaultUncaughtExceptionHandler
        (reify Thread$UncaughtExceptionHandler
          (uncaughtException [_ thread throwable]
            (handler thread throwable))))
      (f)
      (Thread/setDefaultUncaughtExceptionHandler old-handler-thread)
      (catch Throwable t (.printStackTrace t) (throw t)))))
(fact "Exception in thread"
      (let [log (p/promise)]
        (with-default-uncaught-exception-handler-thread
          (p/sfn [_ throwable] (deliver log throwable))
          (p/sfn []
                 (let [ex (Exception. "This exception is expected")
                       ret (thread (throw ex))]
                   (<!! ret)
                   (fact (identical? ex (root-cause @log)) => true))))))
(fact "Exception in go"
      (let [log (p/promise)]
        (with-default-uncaught-exception-handler-fiber
          (p/sfn [_ throwable] (deliver log throwable))
          (p/sfn []
                 (let [ex (Exception. "This exception is expected")
                       ret (go (throw ex))]
                   (<!! ret)
                   (fact (root-cause @log) => ex))))))
(fact "Exception in put callback"
      (let [log (p/promise)]
        (with-default-uncaught-exception-handler-fiber
          (p/sfn [_ throwable] (deliver log throwable))
          (p/sfn []
                 (let [ex (Exception. "This exception is expected")
                       c (chan)]
                   (put! c :foo (fn [_] (throw ex)))
                   (<!! c)
                   (fact (root-cause @log) => ex))))))
(fact "Exception in take callback"
      (let [log (p/promise)]
        (with-default-uncaught-exception-handler-fiber
          (p/sfn [_ throwable] (deliver log throwable))
          (p/sfn []
                 (let [ex (Exception. "This exception is expected")
                       c (chan)]
                   (take! c (fn [_] (throw ex)))
                   (>!! c :foo)
                   (fact (root-cause @log) => ex))))))
 
(ns co.paralleluniverse.pulsar.async.pipeline-test
  (:use midje.sweet)
  (:require [co.paralleluniverse.pulsar.core :as p]
            [co.paralleluniverse.pulsar.async :as a :refer [<! >! <!! >!! go go-loop thread fiber chan close! to-chan
                                                            pipeline pipeline-blocking pipeline-async]])
  (:import (co.paralleluniverse.strands Strand)))

in Clojure 1.7, use (map f) instead of this

(defn mapping [f]
  (p/sfn [f1]
    (p/sfn
      ([] (f1))
      ([result] (f1 result))
      ([result input]
         (f1 result (f input)))
      ([result input & inputs]
         (f1 result (apply f input inputs))))))
(defn pipeline-tester [pipeline-fn n inputs xf]
  (let [cin (to-chan inputs)
        cout (chan 1)]
    (pipeline-fn n cout xf cin)
    (<!! (go-loop [acc []] 
                  (let [val (<! cout)]
                    (if (not (nil? val))
                          (recur (conj acc val))
                          acc))))))
(def identity-mapping (mapping identity))
(defn identity-async [v ch] (thread (>!! ch v) (close! ch)))
(p/defsfn identity-async-fiber [v ch] (fiber (>!! ch v) (close! ch)))
(tabular "Test sizes"
  (fact
    (let [r (range ?size)]
      (and
       (= r (pipeline-tester pipeline ?n r identity-mapping))
       (= r (pipeline-tester pipeline-blocking ?n r identity-mapping))
       (= r (pipeline-tester pipeline-async ?n r identity-async))
       (= r (pipeline-tester pipeline-async ?n r identity-async-fiber))
       )) => true)
    ?n ?size
    1 0
    1 10
    10 10
    20 10
    5 1000)
(fact "Test close?"
  (doseq [pf [pipeline pipeline-blocking]]
    (let [cout (chan 1)]
      (pf 5 cout identity-mapping (to-chan [1]) true)
      (fact (<!! cout) => 1)
      (fact (<!! cout) => nil))
    (let [cout (chan 1)]
      (pf 5 cout identity-mapping (to-chan [1]) false)
      (fact (<!! cout) => 1)
      (>!! cout :more)
      (fact (<!! cout) => :more))
    (let [cout (chan 1)]
      (pf 5 cout identity-mapping (to-chan [1]) nil)
      (fact (<!! cout) => 1)
      (>!! cout :more)
      (fact (<!! cout) => :more))))
(fact "Test ex-handler"
  (doseq [pf [pipeline pipeline-blocking]]
    (let [cout (chan 1)
          chex (chan 1)
          ex-mapping (mapping (p/sfn [x] (if (= x 3) (throw (ex-info "err" {:data x})) x)))
          ex-handler (p/sfn [e] (do (>!! chex e) :err))]
      (pf 5 cout ex-mapping (to-chan [1 2 3 4]) true ex-handler)
      (fact (<!! cout) => 1)
      (fact (<!! cout) => 2)
      (fact (<!! cout) => :err)
      (fact (<!! cout) => 4)
      (fact (ex-data (<!! chex)) => {:data 3}))))
(p/defsfn multiplier-async [v ch]
  (fiber
    (dotimes [i v]
      (>!! ch i))
    (close! ch)))
(fact "Test af-multiplier"
  (pipeline-tester pipeline-async 2 (range 1 5) multiplier-async)
  => [0 0 1 0 1 2 0 1 2 3])
(def sleep-mapping (mapping (p/sfn [ms] (do (Strand/sleep ms) ms))))
(let [times [2000 50 1000 100]]
  (fact "Test blocking"
    (pipeline-tester pipeline-blocking 2 times sleep-mapping)
    => times))
(p/defsfn slow-fib [n]
  (if (< n 2) n (+ (slow-fib (- n 1)) (slow-fib (- n 2)))))
(let [input (take 5 (cycle (range 15 38)))]
  (fact "Test compute"
    (last (pipeline-tester pipeline 8 input (mapping slow-fib)))
    => (slow-fib (last input))))
(fact "Test async"
  (pipeline-tester pipeline-async 1 (range 100)
                   (p/sfn [v ch] (future (>!! ch (inc v)) (close! ch))))
  => (range 1 101))
 

A binary-data buffer message example

(ns co.paralleluniverse.pulsar.examples.binary
  (:use [co.paralleluniverse.pulsar core actors]
        [gloss core io])
  (:refer-clojure :exclude [promise await]))

This is an example of sending, receiving and matching binary data buffers

This is the layout of our binary buffer:

(def fr (compile-frame {:a :int16, :b :float32}))
(defsfn receiver []
  (receive [buffer #(decode fr %)]
           {:a 1 :b b} (println "Got buffer (a=1) b: " b)
           {:a a :b b} (println "Got unexpected buffer" buffer "a: " a "b: " b)
           :after 100 (println "timeout!")))
(defn -main []
  (let [r (spawn receiver)
        buffer (encode fr {:a 1 :b 2.3})]
    (! r buffer)
    (join r)))
 

Hot code swapping example

(ns co.paralleluniverse.pulsar.examples.codeswap
  (:use [co.paralleluniverse.pulsar core actors])
  (:refer-clojure :exclude [promise await]))
(defsfn a [n]
        (println "I'm a simple actor" n)
        (when-let [m (receive-timed 1000)]
          (println "message:" m))
        (recur-swap a (inc n)))
(def s (sreify Server
         (init [_])
         (terminate [_ cause])
         (handle-call [_ from id [a b]]
           (sleep 50)
           (+ a b))))
(defn -main []
  (println "starting")
  (let [actor (spawn a 1)
        server (spawn (gen-server s))
        sender (spawn (fn [i]
                        (! actor i)
                        (println "server responded:" (call! server i 10))
                        (sleep 1500)
                        (recur (inc i)))
                      0)]
    (sleep 8 :sec)
    (println "swapping")
    (defsfn a [n]
            (println "I'm a simple, but better, actor" n)
            (when-let [m (receive-timed 1000)]
              (println "message!" m))
            (recur-swap a (inc n)))
    (def s (sreify Server
             (init [_])
             (terminate [_ cause])
             (handle-call [_ from id [a b]]
               (sleep 50)
               (* a 1000))))
    (sleep 8 :sec)
    ;(join actor)
    (println "done")))
 
(ns co.paralleluniverse.pulsar.examples.graph
  (:use [co.paralleluniverse.pulsar core actors])
  (:refer-clojure :exclude [promise await]))
(defn- actors-in-a-ring
  [n]
  (let [actor (fn [] (spawn
                      #(loop [npings (int n)
                              npongs (int n)]
                         (when (or (pos? npings) (pos? npongs))
                           (receive
                            [:start actors] (do
                                              (doseq [actor actors]
                                                (! actor [:ping @self])) ; can also try with !!
                                              (recur npings npongs))
                            [:ping from]    (do
                                              (! from :pong)
                                              (recur (dec npings) npongs))
                            :pong           (recur npings (dec npongs)))))))
        actors (take n (repeatedly actor))]
    actors))
(defn run
  [n]
  (let [all (actors-in-a-ring n)]
    (doseq [actor all]
      (! actor [:start all]))
    (doseq [actor all]
      (join actor))))
(defn -main
  ([] (-main "100"))
  ([times & _]
   (let [n (Integer/parseInt times)]
     (time
      (dotimes [_ n]
        (time (run n)))))
   (System/exit 0)))
 

Uses Pulsar's fiber-blocking IO

(ns co.paralleluniverse.pulsar.examples.io
  (:use [co.paralleluniverse.pulsar core])
  (:refer-clojure :exclude [promise await])
  (:import [co.paralleluniverse.fibers.io FiberSocketChannel FiberServerSocketChannel]
           [java.nio ByteBuffer CharBuffer]
           [java.nio.charset Charset]
           [java.net InetSocketAddress]))
(def port 1234)
(defn buffer->string
  ([byte-buffer]
   (buffer->string byte-buffer (Charset/defaultCharset)))
  ([byte-buffer charset]
   (.toString (.decode charset byte-buffer))))
(defn string->buffer
  ([string]
   (string->buffer string (Charset/defaultCharset)))
  ([string charset]
   (.encode charset string)))
(defn -main
  []
  (let [server (spawn-fiber 
                 (fn [] 
                   (with-open [socket (-> (FiberServerSocketChannel/open) (.bind (InetSocketAddress. port)))
                               ^FiberSocketChannel ch (.accept socket)]
                     (let [buf (ByteBuffer/allocateDirect 1024)]
                       (.read ch buf)
                       (.flip buf)
                       (println "Server got request:" (buffer->string buf))
                       (.write ch (string->buffer "my response"))))))
        client (spawn-fiber 
                  (fn [] 
                    (with-open [^FiberSocketChannel ch (FiberSocketChannel/open (InetSocketAddress. port))]
                      (let [buf (ByteBuffer/allocateDirect 1024)]
                        (println "Client sending request")
                        (.write ch (string->buffer "a request"))
                        (.read ch buf)
                        (.flip buf)
                        (println "Client got response:" (buffer->string buf))))))]
    (join server)
    (join client)))
 

The classic ping-pong example from the Erlang tutorial

(ns co.paralleluniverse.pulsar.examples.pingpong
  (:use [co.paralleluniverse.pulsar core actors])
  (:refer-clojure :exclude [promise await]))

This example is intended to be a line-by-line translation of the canonical Erlang ping-pong example, so it is not written in idiomatic Clojure.

(defsfn ping [n pong]
  (if (== n 0)
    (do
      (! pong :finished)
      (println "ping finished"))
    (do
      (! pong [:ping @self])
      (receive
        :pong (println "Ping received pong"))
      (recur (dec n) pong))))
(defsfn pong []
  (receive
    :finished (println "Pong finished")
    [:ping ping] (do
                   (println "Pong received ping")
                   (! ping :pong)
                   (recur))))
(defn -main []
  (let [a1 (spawn pong)
        b1 (spawn ping 3 a1)]
    :ok))
 

The classic ping-pong example from the Erlang tutorial

(ns co.paralleluniverse.pulsar.examples.pingpong-register
  (:use [co.paralleluniverse.pulsar core actors])
  (:refer-clojure :exclude [promise await]))

This example is intended to be a line-by-line translation of the canonical Erlang ping-pong example, so it is not written in idiomatic Clojure.

(defsfn ping [n]
  (if (== n 0)
    (do
      (! :pong :finished)
      (println "ping finished"))
    (do
      (! :pong [:ping @self])
      (receive
        :pong (println "Ping received pong"))
      (recur (dec n)))))
(defsfn pong []
  (receive
    :finished (println "Pong finished")
    [:ping ping] (do
                   (println "Pong received ping")
                   (! ping :pong)
                   (recur))))
(defn -main []
  (register! :pong (spawn pong))
  (spawn ping 3)
  :ok)
 

An implementation of the ring benchmark using fibers and primitive (int) channels

(ns co.paralleluniverse.pulsar.examples.primitive-ring-benchmark
  (:use co.paralleluniverse.pulsar.core)
  (:refer-clojure :exclude [promise await]))
(defn spawn-relay [prev n]
  (if (== n 0)
    prev
    (let [channel (int-channel 10)
          fiber (spawn-fiber #(loop []
                                (let [m (rcv-int channel)]
                                  ;(println n ": " m)
                                  (snd-int prev (inc m))
                                  (recur))))]
      (recur channel (dec n)))))
(defn -main [M1 N1]
  (let [M (int (Integer/parseInt M1))
        N  (int (Integer/parseInt N1))]
    (println "M: " M " N: " N)
    (dotimes [i 1000]
      (let [num-messages
            (time
              (let [manager-channel (int-channel 10)
                    last-channel (spawn-relay manager-channel (dec N))
                    manager (spawn-fiber
                              #(do (snd-int last-channel 1)  ; start things off
                                 (loop [j (int 1)]
                                   (let [m (rcv-int manager-channel)]
                                     (if (< j M)
                                       (do
                                         ;(println "m: " m)
                                         (snd-int last-channel (inc m))
                                         (recur (inc j)))
                                       m)))))]
                (join manager)))]
        (println i ": Messages " num-messages)))))
 

A selective receive example

(ns co.paralleluniverse.pulsar.examples.priority
  (:use [co.paralleluniverse.pulsar core actors])
  (:refer-clojure :exclude [promise await]))

This example is intended to be a line-by-line translation of this example from the book Learn You Some Erlang for great good!, so it is not written in idiomatic Clojure

(declare normal)
(defsfn important []
  (receive
   [(priority :guard #(> % 10)) msg] (cons msg (important))
   :after 0 (normal)))
(defsfn normal []
  (receive
   [_ msg] (cons msg (normal))
   :after 0 ()))
(defn -main []
  (join (spawn
         (fn []
           (! @self [15 :high])
           (! @self [7 :low])
           (! @self [1 :low])
           (! @self [17 :high])
           (println (important))))))
 

An implementation of the ring benchmark using actors

(ns co.paralleluniverse.pulsar.examples.ring-benchmark
  (:use [co.paralleluniverse.pulsar core actors])
  (:refer-clojure :exclude [promise await])
  (:import [co.paralleluniverse.actors Actor]))
(defn spawn-relay-actor [^Actor prev n]
  (if (== n 0)
    prev
    (let [actor (spawn :mailbox-size 10
                       #(loop []
                          (! prev (inc (receive)))
                          (recur)))]
      (recur actor (dec n)))))
(defn -main [M1 N1]
  (let [M (Integer/parseInt M1)
        N (Integer/parseInt N1)]
    (println "M: " M " N: " N)
    (dotimes [i 1000]
      (let [num-messages
            (time
             (let [manager
                   (spawn :mailbox-size 10
                          #(let [last-actor (spawn-relay-actor @self (dec N))]
                             (! last-actor 1) ; start things off
                             (loop [j (int 1)]
                               (let [m (receive)]
                                 (if (< j M)
                                   (do
                                     (! last-actor (inc m))
                                     (recur (inc j)))
                                   m)))))]
               (join manager)))]
        (println i ": Messages " num-messages)))))
 

A very simple selective-receive example

(ns co.paralleluniverse.pulsar.examples.selective
  (:use [co.paralleluniverse.pulsar core actors])
  (:refer-clojure :exclude [promise await]))
(defsfn adder []
  (loop []
    (receive
      [from tag [:add a b]] (! from tag [:sum (+ a b)]))
    (recur)))
(defsfn computer [adder]
  (loop []
    (receive [m]
             [from tag [:compute a b c d]] (let [tag1 (maketag)]
                                             (! adder [@self tag1 [:add (* a b) (* c d)]])
                                             (receive
                                               [tag1 [:sum sum]]  (! from tag [:result sum])
                                               :after 10          (! from tag [:error "timeout!"])))
             :else (println "Unknown message: " m))
    (recur)))
(defsfn curious [nums computer]
  (when (seq nums)
    (let [[a b c d] (take 4 nums)
          tag       (maketag)]
      (! computer @self tag [:compute a b c d])
      (receive [m]
               [tag [:result res]]  (println a b c d "->" res)
               [tag [:error error]] (println "ERROR: " a b c d "->" error)
               :else (println "Unexpected message" m))
      (recur (drop 4 nums) computer))))
(defn -main []
  (let [ad (spawn adder)
        cp (spawn computer ad)
        cr (spawn curious (take 20 (repeatedly #(rand-int 10))) cp)]
    (join cr)
    :ok))
 

An implementation of the selective example using gen-server

(ns co.paralleluniverse.pulsar.examples.selective-gen-server
  (:use [co.paralleluniverse.pulsar core actors])
  (:refer-clojure :exclude [promise await])
  (:import [co.paralleluniverse.strands Strand]))
(def adder (gen-server (reify Server
                         (init [_])
                         (terminate [_ cause])
                         (handle-call [_ from id [command a b]]
                                      (case command
                                        :add (+ a b))))))
(defn computer [adder]
  (gen-server (reify Server
                (init [_])
                (terminate [_ cause])
                (handle-call [_ from id [command a b c d]]
                             (case command
                               :compute (call-timed! adder 10 :ms :add (* a b) (* c d)))))))
(defsfn curious [nums computer]
  (when (seq nums)
    (let [[a b c d] (take 4 nums)
          res (call! computer :compute a b c d)]
      (println a b c d "->" res)
      (recur (drop 4 nums) computer))))
(defn -main []
  (let [ad (spawn adder)
        cp (spawn (computer ad))
        cr (spawn curious (take 20 (repeatedly #(rand-int 10))) cp)]
    (join cr)
    :ok))
 
(ns co.paralleluniverse.pulsar.examples.tkb
  (:use co.paralleluniverse.pulsar.core)
  (:import [co.paralleluniverse.strands.channels ReceivePort$EOFException])
  (:refer-clojure :exclude [promise await])
  (:require [co.paralleluniverse.pulsar.rx :as rx]))

(let [numbers (topic) letters (topic)

 f1 (spawn-fiber
      (fn []
        (let [c (->>
                  (rx/zip (->>
                            (subscribe! letters (channel 0))
                            (rx/mapcat #(repeat 3 %)))
                          (->>
                            (subscribe! numbers (channel 0))
                            (rx/filter odd?)
                            (rx/mapcat #(list % (* 10 %) (* 100 %)))))
                  (rx/map (fn [[c n]] (str "letter: " c " number: " n))))]
          (loop []
            (when-let [m (rcv c)]
              (println "=> " m )
              (recur))))))]

(spawn-fiber (fn [] (doseq [x (seq "abcdefghijklmnopqrstuwvxyz")] (sleep 50) (snd letters x)) (close! letters)))

(spawn-fiber (fn [] (doseq [x (range 1000)] (sleep 70) (snd numbers x)) (close! numbers)))

(join [f1]))

(defn -main[])