co.paralleluniverse.pulsar.core

Pulsar is an implementation of lightweight threads (fibers),
Go-like channles and Erlang-like actors for the JVM

->timeunit

(->timeunit x)
Constructs an instance of `java.util.concurrent.TimeUnit`.
If argument x is already an instance of `TimeUnit`, the function returns x.
Otherwise, x *must* be a keyword, in which case the following conversion
is performed:

:nanoseconds | :nanos | :ns   -> TimeUnit/NANOSECONDS
:microseconds | :us           -> TimeUnit/MICROSECONDS
:milliseconds | :millis | :ms -> TimeUnit/MILLISECONDS
:seconds | :sec               -> TimeUnit/SECONDS
:minutes | :mins              -> TimeUnit/MINUTES
:hours | :hrs                 -> TimeUnit/HOURS
:days                         -> TimeUnit/DAYS

alive?

(alive? a)
Tests whether or not a strand is alive. 
A strand is alive if it has been started but has not yet died.

await

macro

(await f & args)
Calls f, which takes a callback of a single argument as its last parameter,
with arguments args, and blocks the current fiber until the callback is called,
then returns the value passed to the callback.

channel

(channel capacity overflow-policy single-producer? single-consumer?)(channel capacity overflow-policy)(channel capacity)(channel)
Creates a new channel.

Optional arguments:
capacity         - specifies how many messages the channel can contain (until they are consumed)
                   * A value of `0` designates a *transfer channel*, that blocks both `snd` and `rcv`
                     until a corresponding operation (`rcv` or `snd` respectively) is called.
                   * A value of `-1` creates an unbounded channel.

                   default: 0

overflow-policy  - specifies what `snd` does when the channel's capacity is exhausted.
                   May be one of:
                   * :throw    - throws an exception.
                   * :block    - blocks until a message is consumed and room is available
                   * :drop     - the message is silently dropped
                   * :displace - the old message waiting in the queue is discarded to make room for the new message.

                   default: :block

single-producer? - specifies if the channel should be single-producer.

                   default: false

single-consumer? - specifies if the channel should be single-consumer.

                   default: true

The default channel capacity is 0 and the default policy is :block

close!

(close! channel)(close! channel exception)
Closes a channel.
Messages already in the channel will be received, but all future attempts at `snd`
will silently discard the message. After all messages have been consumed, `rcv` will
return `nil`.

If an exception is passed as the second argument, then the same will happen, except after
all messages are consumed, the passed exception will be thrown by `rcv`, wrapped in a
`co.paralleluniverse.strands.channels.ProducerException`.

closed?

(closed? channel)
Tests whether a channel has been closed and contains no more messages that 
can be received.

convert-duration

(convert-duration x from-unit to-unit)
Converts a time duration from one time unit to another.
x is the duration; `from-unit` and `to-unit` are the source
and target units repsectively, given as either a j.u.c.TimeUnit instance
or as a keyword, as specified by `->timeunit`.

create-fiber

(create-fiber & args)
Creates, but does not start a new fiber (a lightweight thread) running in a fork/join pool.

It is much preferable to use `spawn-fiber`.

current-fiber

(current-fiber)
Returns the currently running lightweight-thread or `nil` if none.

current-strand

(current-strand)
Returns the currently running fiber (if running in fiber)
or current thread (if not).

dbg

macro

(dbg & body)

default-fiber-scheduler

A global fiber scheduler. The scheduler uses all available processor cores.

defsfn

macro

(defsfn & expr)
Defines a suspendable function that can be used by a fiber or actor.
Used exactly like `defn`

double-channel

(double-channel size overflow-policy)(double-channel size)(double-channel)
Creates a double channel

fiber

macro

(fiber & body)
Runs the given body in a newly created fiber and returns it.

fiber->future

(fiber->future f)
Takes a spawned fiber yields a future object that will
invoke the function in another thread, and will cache the result and
return it on all subsequent calls to deref/@. If the computation has
not yet finished, calls to deref/@ will block, unless the variant
of deref with timeout is used. See also - realized?.

float-channel

(float-channel size overflow-policy)(float-channel size)(float-channel)
Creates a float channel

int-channel

(int-channel size overflow-policy)(int-channel size)(int-channel)
Creates an int channel

join

(join s)(join timeout unit s)
Awaits the termination of the given strand or strands, and returns
their result, if applicable.

If a single strand is given, its result is returned;
if a collection - then a collection of the respective results.

Note that for threads, the result is always `nil`, as threads don't return a value.

If a timeout is supplied and it elapses before the strand has terminated,
a j.u.c.TimeoutException is thrown.

s       - either a strand or a collection of strands.
timeout - how long to wait for the strands termination
unit    - the unit of the timeout duration. TimeUnit or keyword as in `->timeunit`

letsfn

macro

(letsfn fnspecs & body)
Defines a local suspendable function that can be used by a fiber or actor.
Used exactly like `letfn`

long-channel

(long-channel size overflow-policy)(long-channel size)(long-channel)
Creates a long channel

promise

(promise f)(promise)
Returns a promise object that can be read with deref/@, and set,
once only, with deliver. Calls to deref/@ prior to delivery will
block, unless the variant of deref with timeout is used. All
subsequent derefs will return the same delivered value without
blocking. See also - realized?.

Unlike clojure.core/promise, this promise object can be used inside Pulsar fibers.

rcv

(rcv channel)(rcv channel timeout unit)
Receives a message from a channel.
This function will block until a message is available or until the timeout,
if specified, expires.
If a timeout is given, and it expires, rcv returns nil.
Otherwise, the message is returned.

rcv-double

macro

(rcv-double channel)(rcv-double channel timeout unit)
Receives a double value from a double-channel.

See: `rcv`

rcv-float

macro

(rcv-float channel)(rcv-float channel timeout unit)
Receives a float value from a float-channel.

See: `rcv`

rcv-int

macro

(rcv-int channel)(rcv-int channel timeout unit)
Receives an int value from an int-channel.

See: `rcv`

rcv-into

(rcv-into to channel n)
Receives at most n values from the given channel and conjoins them
into the to collection

rcv-long

macro

(rcv-long channel)(rcv-long channel timeout unit)
Receives a long value from a long-channel.

See: `rcv`

sel

(sel ports & {:as opts})
Performs up to one of several given channel operations.
sel takes a collection containing *channel operation descriptors*. A descriptor is 
either a channel or a pair (vector) of a channel and a message. 
Each channel in the sequence represents a `rcv` attempt, and each channel-message pair 
represents a `snd` attempt. 
The `sel` function performs at most one operation on the sequence, a `rcv` or a `snd`, 
which is determined by the first operation that can succeed. If no operation can be 
carried out immediately, `sel` will block until an operation can be performed, or the
optionally specified timeout expires.
If two or more operations are available at the same time, one of them will be chosen
at random, unless the `:priority` option is set to `true`.

Options:
:priority bool -  If set to `true`, then whenever two or more operations are available
                  the first among them, in the order they are listed in the `ports` collection,
                  will be the one executed.
:timeout millis - If timeout is set and expires before any of the operations are available,
                  the function will return `nil`

Returns:
If an operation succeeds, returns a vector `[m ch]` with `m` being the message received if the
operation is a `rcv`, or `nil` if it's a `snd`, and `ch` is the channel on which the succesful
opration was performed.
If a timeout is set and expires before any of the operations are available, returns `nil`.

select

macro

(select & clauses)
Performs a very similar operation to `sel`, but allows you to specify an action to perform depending 
on which operation has succeeded.
Takes an even number of expressions, ordered as (ops1, action1, ops2, action2 ...) with the ops being 
a channel operation descriptior (remember: a descriptor is either a channel for an `rcv` operation, 
or a vector of a channel and a message specifying a `snd` operation) or a collection of descriptors, 
and the actions are Clojure expressions. 
Like `sel`, `select` performs at most one operation, in which case it will run the operation's 
respective action and return its result.

An action expression can bind values to the operations results. 
The action expression may begin with a vector of one or two symbols. In that case, the first symbol 
will be bound to the message returned from the successful receive in the respective ops clause 
(or `nil` if the successful operation is a `snd`), and the second symbol, if present, will be bound 
to the successful operation's channel.

Like `sel`, `select` blocks until an operation succeeds, or, if a `:timeout` option is specified, 
until the timeout (in milliseconds) elapses. If a timeout is specfied and elapses, `select` will run 
the action in an optional `:else` clause and return its result, or, if an `:else` clause is not present, 
`select` will return `nil`.

Example:

(select :timeout 100 
       c1 ([v] (println "received" v))
       [[c2 m2] [c3 m3]] ([v c] (println "sent to" c))
       :else "timeout!")

In the example, if a message is received from channel `c1`, then it will be printed. 
If a message is sent to either `c2` or `c3`, then the identity of the channel will be printed, 
and if the 100 ms timeout elapses then "timeout!" will be printed.

seq->channel

(seq->channel x)
Turns a sequence into a receive port that receives all values in the sequence

sfn

macro

(sfn & expr)
Creates a suspendable function that can be used by a fiber or actor.
Used exactly like `fn`

singleton-channel

(singleton-channel x)
Returns a channel that receives a single, given value
and then closes

sleep

(sleep ms)(sleep timeout unit)
Suspends the current strand.

snd

(snd channel message)
Sends a message to a channel.
If the channel's overflow policy is `:block` than this function will block
if the channels' capacity is exceeded.

snd-double

macro

(snd-double channel message)
Sends a double value to a double-channel.  

See: `snd`

snd-float

macro

(snd-float channel message)
Sends a float value to a float-channel.  

See: `snd`

snd-int

macro

(snd-int channel message)
Sends an int value to an int-channel.

See: `snd`

snd-long

macro

(snd-long channel message)
Sends a long value to a long-channel.  

See: `snd`

snd-seq

(snd-seq channel ms)
Sends a sequence of messages to a channel

spawn-fiber

macro

(spawn-fiber :name? :stack-size? :scheduler? f & args)
Creates and starts a new fiber.

f - the function to run in the fiber.
args - (optional) arguments for the function

Options:
:name str     - the fiber's name
:stack-size n - the fiber's initial stack size
:scheduler    - the fiber schdeuler in which the fiber will run

spawn-thread

(spawn-thread :name? f & args)
Creates and starts a new thread.

f - the function to run in the thread.
args - (optional) arguments to pass to the function

Options:
:name str     - the thread's name

sreify

macro

(sreify & expr)
Creates a suspendable implementation of a protocol or interface.
sreify is to reify what sfn is to fn.

start

(start fiber)
Starts a fiber created with `create-fiber`.

strampoline

(strampoline f)(strampoline f & args)
A suspendable version of trampoline. Should be used to implement
finite-state-machine actors.

trampoline can be used to convert algorithms requiring mutual
recursion without stack consumption. Calls f with supplied args, if
any. If f returns a fn, calls that fn with no arguments, and
continues to repeat, until the return value is not a fn, then
returns that non-fn value. Note that if you want to return a fn as a
final value, you must wrap it in some data structure and unpack it
after trampoline returns.

subscribe!

(subscribe! topic channel)
Subscribes a channel to a topic.
The subscribed channel will receive all messages sent to the topic.

suspendable!

(suspendable! f)(suspendable! x prot)
Makes a function suspendable.

suspendable?

(suspendable? f)
Returns true of a function has been instrumented as suspendable; false otherwise.

tagged

(tagged tag sym)

ticker-consumer

(ticker-consumer ticker)
Creates a rcv-port (read-only channel) that returns messages from a *ticker channel*.
A ticker channel is a bounded channel with an overflow policy of :displace.

Different ticker consumers are independent (a message received from one is not removed from others),
and guarantee monotonicty (messages are received in order), but if messages are sent to the
ticker channel faster than they are consumed then messages can be lost.

topic

(topic)
Creates a new topic.
A topic is a send-port (a write-only channel) that forwards every message sent to it
to a group of subscribed channels.
Use `subscribe!` and `unsubscribe!` to subscribe and unsubscribe a channel to or from
the topic.

try-rcv

(try-rcv channel)
Attempts to immediately (without blocking) receive a message from a channel.
Returns the message if one is immediately available; `nil` otherwise.
This function never blocks.

try-snd

(try-snd channel message)
Tries to immediately send a message to a channel.
If the channel's capacity is exceeded, this function fails and returns `false`.
Returns `true` if the operation succeeded; `false` otherwise.
This function never blocks.

try-snd-double

macro

(try-snd-double channel message)
Tries to immediately send a double value to a double-channel.
Returns `true` if successful, `false` otherwise.  

See: `try-snd`

try-snd-float

macro

(try-snd-float channel message)
Tries to immediately send a float value to a float-channel.
Returns `true` if successful, `false` otherwise.  

See: `try-snd`

try-snd-int

macro

(try-snd-int channel message)
Tries to immediately send an int value to an int-channel.
Returns `true` if successful, `false` otherwise.

See: `try-snd`

try-snd-long

macro

(try-snd-long channel message)
Tries to immediately send a long value to a long-channel.
Returns `true` if successful, `false` otherwise.  

See: `try-snd`

unsubscribe!

(unsubscribe! topic channel)
Unsubscribes a channel from a topic.
The channel will stop receiving messages sent to the topic.