Class Channels
- java.lang.Object
-
- co.paralleluniverse.strands.channels.Channels
-
public final class Channels extends java.lang.Object
A utility class for creating and manipulating channels.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
Channels.OverflowPolicy
Determines how a channel behaves when its internal buffer (if it has one) overflows.
-
Method Summary
All Methods Static Methods Concrete Methods Modifier and Type Method Description static <T> ReceivePort<T>
emptyReceivePort()
Returns an emptyReceivePort
.static <S,T>
voidfiberTransform(FiberFactory fiberFactory, ReceivePort<S> in, SendPort<T> out, SuspendableAction2<? extends ReceivePort<? super S>,? extends SendPort<? extends T>> transformer)
Spawns a fiber that transforms values read from thein
channel and writes values to theout
channel.static <S,T>
voidfiberTransform(ReceivePort<S> in, SendPort<T> out, SuspendableAction2<? extends ReceivePort<? super S>,? extends SendPort<? extends T>> transformer)
Spawns a fiber that transforms values read from thein
channel and writes values to theout
channel.static <M> ReceivePort<M>
filter(ReceivePort<M> channel, com.google.common.base.Predicate<M> pred)
Returns aReceivePort
that filters messages that satisfy a predicate from a given channel.static <M> SendPort<M>
filterSend(SendPort<M> channel, com.google.common.base.Predicate<M> pred)
Returns aSendPort
that filters messages that satisfy a predicate before sending to a given channel.static <S,T>
ReceivePort<T>flatMap(ReceivePort<S> channel, com.google.common.base.Function<S,ReceivePort<T>> f)
Returns aReceivePort
that receives messages that are transformed by a given flat-mapping function from a given channel.static <S,T>
SendPort<S>flatMapSend(FiberFactory fiberFactory, Channel<S> pipe, SendPort<T> channel, com.google.common.base.Function<S,ReceivePort<T>> f)
Returns aSendPort
that sends messages that are transformed by a given flat-mapping function into a given channel.static <S,T>
SendPort<S>flatMapSend(Channel<S> pipe, SendPort<T> channel, com.google.common.base.Function<S,ReceivePort<T>> f)
static <T> void
forEach(ReceivePort<T> channel, SuspendableAction1<T> action)
Performs the given action on each message received by the given channel.static <M> ReceivePort<M>
group(ReceivePort<? extends M>... channels)
Returns aReceivePort
that receives messages from a set of channels.static <M> ReceivePort<M>
group(java.util.Collection<? extends ReceivePort<? extends M>> channels)
Returns aReceivePort
that receives messages from a set of channels.static boolean
isTickerChannel(ReceivePort<?> channel)
Tests whether a given channel is a ticker channel, namely a channel with a bounded buffer and anoverflow policy
ofDISPLACE
.static <S,T>
ReceivePort<T>map(ReceivePort<S> channel, com.google.common.base.Function<S,T> f)
Returns aReceivePort
that receives messages that are transformed by a given mapping function from a given channel.static <T> ReceivePort<T>
mapErrors(ReceivePort<T> channel, com.google.common.base.Function<java.lang.Exception,T> f)
Returns aReceivePort
that maps exceptions thrown by the underlying channel (by channel transformations, or as a result ofSendPort.close(Throwable)
) into messages.static <S,T>
SendPort<S>mapSend(SendPort<T> channel, com.google.common.base.Function<S,T> f)
Returns aSendPort
that transforms messages by applying a given mapping function before sending them to a given channel.static <M> Mix<? extends M>
mix(ReceivePort<? extends M>... channels)
Returns aMix
that receives messages from a set of channels.static <M> Mix<? extends M>
mix(java.util.Collection<? extends ReceivePort<? extends M>> channels)
Returns aMix
that receives messages from a set of channels.static <Message> Channel<Message>
newChannel(int bufferSize)
Creates a new channel with the given mailbox size with other properties set to their default values.static <Message> Channel<Message>
newChannel(int bufferSize, Channels.OverflowPolicy policy)
Creates a new channel with the given mailbox size andChannels.OverflowPolicy
, with other properties set to their default values.static <Message> Channel<Message>
newChannel(int bufferSize, Channels.OverflowPolicy policy, boolean singleProducer, boolean singleConsumer)
Creates a new channel with the given properties.static DoubleChannel
newDoubleChannel(int bufferSize)
Creates a new primitivedouble
channel with the given mailbox size with other properties set to their default values.static DoubleChannel
newDoubleChannel(int bufferSize, Channels.OverflowPolicy policy)
Creates a new primitivedouble
channel with the given mailbox size andChannels.OverflowPolicy
, with other properties set to their default values.static DoubleChannel
newDoubleChannel(int bufferSize, Channels.OverflowPolicy policy, boolean singleProducer, boolean singleConsumer)
Creates a new primitivedouble
channel with the given properties.static FloatChannel
newFloatChannel(int bufferSize)
Creates a new primitivefloat
channel with the given mailbox size with other properties set to their default values.static FloatChannel
newFloatChannel(int bufferSize, Channels.OverflowPolicy policy)
Creates a new primitivefloat
channel with the given mailbox size andChannels.OverflowPolicy
, with other properties set to their default values.static FloatChannel
newFloatChannel(int bufferSize, Channels.OverflowPolicy policy, boolean singleProducer, boolean singleConsumer)
Creates a new primitivefloat
channel with the given properties.static IntChannel
newIntChannel(int bufferSize)
Creates a new primitiveint
channel with the given mailbox size with other properties set to their default values.static IntChannel
newIntChannel(int bufferSize, Channels.OverflowPolicy policy)
Creates a new primitiveint
channel with the given mailbox size andChannels.OverflowPolicy
, with other properties set to their default values.static IntChannel
newIntChannel(int bufferSize, Channels.OverflowPolicy policy, boolean singleProducer, boolean singleConsumer)
Creates a new primitiveint
channel with the given properties.static LongChannel
newLongChannel(int bufferSize)
Creates a new primitivelong
channel with the given mailbox size with other properties set to their default values.static LongChannel
newLongChannel(int bufferSize, Channels.OverflowPolicy policy)
Creates a new primitivelong
channel with the given mailbox size andChannels.OverflowPolicy
, with other properties set to their default values.static LongChannel
newLongChannel(int bufferSize, Channels.OverflowPolicy policy, boolean singleProducer, boolean singleConsumer)
Creates a new primitivelong
channel with the given properties.static <Message> ReceivePort<Message>
newTickerConsumerFor(Channel<Message> channel)
Creates aReceivePort
that can be used to receive messages from a a ticker channel: a channel of bounded capacity and theDISPLACE
overflow policy.static DoubleReceivePort
newTickerConsumerFor(DoubleChannel channel)
Creates aDoubleReceivePort
that can be used to receive messages from a a ticker channel: a channel of bounded capacity and theDISPLACE
overflow policy.static FloatReceivePort
newTickerConsumerFor(FloatChannel channel)
Creates aFloatReceivePort
that can be used to receive messages from a a ticker channel: a channel of bounded capacity and theDISPLACE
overflow policy.static IntReceivePort
newTickerConsumerFor(IntChannel channel)
Creates anIntReceivePort
that can be used to receive messages from a a ticker channel: a channel of bounded capacity and theDISPLACE
overflow policy.static LongReceivePort
newTickerConsumerFor(LongChannel channel)
Creates aLongReceivePort
that can be used to receive messages from a a ticker channel: a channel of bounded capacity and theDISPLACE
overflow policy.static <S,T>
ReceivePort<T>reduce(ReceivePort<S> channel, co.paralleluniverse.common.util.Function2<T,S,T> f, T init)
Returns aReceivePort
providing messages that are transformed from a given channel by a given reduction function.static <S,T>
SendPort<S>reduceSend(SendPort<T> channel, co.paralleluniverse.common.util.Function2<T,S,T> f, T init)
Returns aSendPort
accepting messages that are transformed by a reduction function.static <T> ReceivePort<T>
singletonReceivePort(T object)
Returns a newly createdReceivePort
that receives a single message: the object given to the function.static <T> ReceivePort<T>
take(ReceivePort<T> channel, long count)
static <T> ReceivePort<T>
toReceivePort(java.lang.Iterable<T> iterable)
Returns a newly createdReceivePort
that receives all the elements iterated by the iterable.static <T> ReceivePort<T>
toReceivePort(java.util.Iterator<T> iterator)
Returns a newly createdReceivePort
that receives all the elements iterated by the iterator.static <M> TransformingReceivePort<M>
transform(ReceivePort<M> channel)
Returns aTransformingReceivePort
wrapping the given channel, which may be used for functional transformations.static <M> TransformingSendPort<M>
transformSend(SendPort<M> channel)
Returns aTransformingSendPort
wrapping the given channel, which may be used for functional transformations.static <M,S1,S2>
ReceivePort<M>zip(ReceivePort<S1> c1, ReceivePort<S2> c2, co.paralleluniverse.common.util.Function2<S1,S2,M> f)
Returns aReceivePort
that combines each vector of messages from a vector of channels into a single combined message.static <M,S1,S2,S3>
ReceivePort<M>zip(ReceivePort<S1> c1, ReceivePort<S2> c2, ReceivePort<S3> c3, co.paralleluniverse.common.util.Function3<S1,S2,S3,M> f)
Returns aReceivePort
that combines each vector of messages from a vector of channels into a single combined message.static <M,S1,S2,S3,S4>
ReceivePort<M>zip(ReceivePort<S1> c1, ReceivePort<S2> c2, ReceivePort<S3> c3, ReceivePort<S4> c4, co.paralleluniverse.common.util.Function4<S1,S2,S3,S4,M> f)
Returns aReceivePort
that combines each vector of messages from a vector of channels into a single combined message.static <M,S1,S2,S3,S4,S5>
ReceivePort<M>zip(ReceivePort<S1> c1, ReceivePort<S2> c2, ReceivePort<S3> c3, ReceivePort<S4> c4, ReceivePort<S5> c5, co.paralleluniverse.common.util.Function5<S1,S2,S3,S4,S5,M> f)
Returns aReceivePort
that combines each vector of messages from a vector of channels into a single combined message.static <M> ReceivePort<M>
zip(java.util.List<? extends ReceivePort<?>> cs, com.google.common.base.Function<java.lang.Object[],M> f)
Returns aReceivePort
that combines each vector of messages from a list of channels into a single combined message.
-
-
-
Method Detail
-
newChannel
public static <Message> Channel<Message> newChannel(int bufferSize, Channels.OverflowPolicy policy, boolean singleProducer, boolean singleConsumer)
Creates a new channel with the given properties.Some combinations of properties are unsupported, and will throw an
An unbounded channel ignores its overflow policy as it never overflows.IllegalArgumentException
if requested:- Type Parameters:
Message
- the type of messages that can be sent to this channel.- Parameters:
bufferSize
- if positive, the number of messages that the channel can hold in an internal buffer;0
for a transfer channel, i.e. a channel with no internal buffer.-1
for a channel with an unbounded (infinite) buffer.policy
- theChannels.OverflowPolicy
specifying how the channel (if bounded) will behave if its internal buffer overflows.singleProducer
- whether the channel will be used by a single producer strand.singleConsumer
- whether the channel will be used by a single consumer strand.- Returns:
- The newly created channel
-
newChannel
public static <Message> Channel<Message> newChannel(int bufferSize, Channels.OverflowPolicy policy)
Creates a new channel with the given mailbox size andChannels.OverflowPolicy
, with other properties set to their default values. Specifically,singleProducer
will be set tofalse
, whilesingleConsumer
will be set totrue
.- Type Parameters:
Message
- the type of messages that can be sent to this channel.- Parameters:
bufferSize
- if positive, the number of messages that the channel can hold in an internal buffer;0
for a transfer channel, i.e. a channel with no internal buffer.-1
for a channel with an unbounded (infinite) buffer.policy
- theChannels.OverflowPolicy
specifying how the channel (if bounded) will behave if its internal buffer overflows.- Returns:
- The newly created channel
- See Also:
newChannel(int, co.paralleluniverse.strands.channels.Channels.OverflowPolicy, boolean, boolean)
-
newChannel
public static <Message> Channel<Message> newChannel(int bufferSize)
Creates a new channel with the given mailbox size with other properties set to their default values. Specifically, theChannels.OverflowPolicy
will be set toBLOCK
,singleProducer
will be set tofalse
, andsingleConsumer
will be set totrue
.- Type Parameters:
Message
- the type of messages that can be sent to this channel.- Parameters:
bufferSize
- if positive, the number of messages that the channel can hold in an internal buffer;0
for a transfer channel, i.e. a channel with no internal buffer.-1
for a channel with an unbounded (infinite) buffer.- Returns:
- The newly created channel
- See Also:
newChannel(int, co.paralleluniverse.strands.channels.Channels.OverflowPolicy, boolean, boolean)
-
newIntChannel
public static IntChannel newIntChannel(int bufferSize, Channels.OverflowPolicy policy, boolean singleProducer, boolean singleConsumer)
Creates a new primitiveint
channel with the given properties.Some combinations of properties are unsupported, and will throw an
An unbounded channel ignores its overflow policy as it never overflows.IllegalArgumentException
if requested:- Parameters:
bufferSize
- if positive, the number of messages that the channel can hold in an internal buffer;0
for a transfer channel, i.e. a channel with no internal buffer.-1
for a channel with an unbounded (infinite) buffer.policy
- theChannels.OverflowPolicy
specifying how the channel (if bounded) will behave if its internal buffer overflows.singleProducer
- whether the channel will be used by a single producer strand.singleConsumer
- whether the channel will be used by a single consumer strand. Currently primitive channels only support a single consumer, so this argument must be set tofalse
.- Returns:
- The newly created channel
-
newIntChannel
public static IntChannel newIntChannel(int bufferSize, Channels.OverflowPolicy policy)
Creates a new primitiveint
channel with the given mailbox size andChannels.OverflowPolicy
, with other properties set to their default values. Specifically,singleProducer
will be set tofalse
, whilesingleConsumer
will be set totrue
.- Parameters:
bufferSize
- if positive, the number of messages that the channel can hold in an internal buffer;0
for a transfer channel, i.e. a channel with no internal buffer.-1
for a channel with an unbounded (infinite) buffer.policy
- theChannels.OverflowPolicy
specifying how the channel (if bounded) will behave if its internal buffer overflows.- Returns:
- The newly created channel
- See Also:
newIntChannel(int, co.paralleluniverse.strands.channels.Channels.OverflowPolicy, boolean, boolean)
-
newIntChannel
public static IntChannel newIntChannel(int bufferSize)
Creates a new primitiveint
channel with the given mailbox size with other properties set to their default values. Specifically, theChannels.OverflowPolicy
will be set toBLOCK
,singleProducer
will be set tofalse
, andsingleConsumer
will be set totrue
.- Parameters:
bufferSize
- if positive, the number of messages that the channel can hold in an internal buffer;0
for a transfer channel, i.e. a channel with no internal buffer.-1
for a channel with an unbounded (infinite) buffer.- Returns:
- The newly created channel
- See Also:
newIntChannel(int, co.paralleluniverse.strands.channels.Channels.OverflowPolicy, boolean, boolean)
-
newLongChannel
public static LongChannel newLongChannel(int bufferSize, Channels.OverflowPolicy policy, boolean singleProducer, boolean singleConsumer)
Creates a new primitivelong
channel with the given properties.Some combinations of properties are unsupported, and will throw an
An unbounded channel ignores its overflow policy as it never overflows.IllegalArgumentException
if requested:- Parameters:
bufferSize
- if positive, the number of messages that the channel can hold in an internal buffer;0
for a transfer channel, i.e. a channel with no internal buffer.-1
for a channel with an unbounded (infinite) buffer.policy
- theChannels.OverflowPolicy
specifying how the channel (if bounded) will behave if its internal buffer overflows.singleProducer
- whether the channel will be used by a single producer strand.singleConsumer
- whether the channel will be used by a single consumer strand. Currently primitive channels only support a single consumer, so this argument must be set tofalse
.- Returns:
- The newly created channel
-
newLongChannel
public static LongChannel newLongChannel(int bufferSize, Channels.OverflowPolicy policy)
Creates a new primitivelong
channel with the given mailbox size andChannels.OverflowPolicy
, with other properties set to their default values. Specifically,singleProducer
will be set tofalse
, whilesingleConsumer
will be set totrue
.- Parameters:
bufferSize
- if positive, the number of messages that the channel can hold in an internal buffer;0
for a transfer channel, i.e. a channel with no internal buffer.-1
for a channel with an unbounded (infinite) buffer.policy
- theChannels.OverflowPolicy
specifying how the channel (if bounded) will behave if its internal buffer overflows.- Returns:
- The newly created channel
- See Also:
newLongChannel(int, co.paralleluniverse.strands.channels.Channels.OverflowPolicy, boolean, boolean)
-
newLongChannel
public static LongChannel newLongChannel(int bufferSize)
Creates a new primitivelong
channel with the given mailbox size with other properties set to their default values. Specifically, theChannels.OverflowPolicy
will be set toBLOCK
,singleProducer
will be set tofalse
, andsingleConsumer
will be set totrue
.- Parameters:
bufferSize
- if positive, the number of messages that the channel can hold in an internal buffer;0
for a transfer channel, i.e. a channel with no internal buffer.-1
for a channel with an unbounded (infinite) buffer.- Returns:
- The newly created channel
- See Also:
newLongChannel(int, co.paralleluniverse.strands.channels.Channels.OverflowPolicy, boolean, boolean)
-
newFloatChannel
public static FloatChannel newFloatChannel(int bufferSize, Channels.OverflowPolicy policy, boolean singleProducer, boolean singleConsumer)
Creates a new primitivefloat
channel with the given properties.Some combinations of properties are unsupported, and will throw an
An unbounded channel ignores its overflow policy as it never overflows.IllegalArgumentException
if requested:- Parameters:
bufferSize
- if positive, the number of messages that the channel can hold in an internal buffer;0
for a transfer channel, i.e. a channel with no internal buffer.-1
for a channel with an unbounded (infinite) buffer.policy
- theChannels.OverflowPolicy
specifying how the channel (if bounded) will behave if its internal buffer overflows.singleProducer
- whether the channel will be used by a single producer strand.singleConsumer
- whether the channel will be used by a single consumer strand. Currently primitive channels only support a single consumer, so this argument must be set tofalse
.- Returns:
- The newly created channel
-
newFloatChannel
public static FloatChannel newFloatChannel(int bufferSize, Channels.OverflowPolicy policy)
Creates a new primitivefloat
channel with the given mailbox size andChannels.OverflowPolicy
, with other properties set to their default values. Specifically,singleProducer
will be set tofalse
, whilesingleConsumer
will be set totrue
.- Parameters:
bufferSize
- if positive, the number of messages that the channel can hold in an internal buffer;0
for a transfer channel, i.e. a channel with no internal buffer.-1
for a channel with an unbounded (infinite) buffer.policy
- theChannels.OverflowPolicy
specifying how the channel (if bounded) will behave if its internal buffer overflows.- Returns:
- The newly created channel
- See Also:
newFloatChannel(int, co.paralleluniverse.strands.channels.Channels.OverflowPolicy, boolean, boolean)
-
newFloatChannel
public static FloatChannel newFloatChannel(int bufferSize)
Creates a new primitivefloat
channel with the given mailbox size with other properties set to their default values. Specifically, theChannels.OverflowPolicy
will be set toBLOCK
,singleProducer
will be set tofalse
, andsingleConsumer
will be set totrue
.- Parameters:
bufferSize
- if positive, the number of messages that the channel can hold in an internal buffer;0
for a transfer channel, i.e. a channel with no internal buffer.-1
for a channel with an unbounded (infinite) buffer.- Returns:
- The newly created channel
- See Also:
newFloatChannel(int, co.paralleluniverse.strands.channels.Channels.OverflowPolicy, boolean, boolean)
-
newDoubleChannel
public static DoubleChannel newDoubleChannel(int bufferSize, Channels.OverflowPolicy policy, boolean singleProducer, boolean singleConsumer)
Creates a new primitivedouble
channel with the given properties.Some combinations of properties are unsupported, and will throw an
An unbounded channel ignores its overflow policy as it never overflows.IllegalArgumentException
if requested:- Parameters:
bufferSize
- if positive, the number of messages that the channel can hold in an internal buffer;0
for a transfer channel, i.e. a channel with no internal buffer.-1
for a channel with an unbounded (infinite) buffer.policy
- theChannels.OverflowPolicy
specifying how the channel (if bounded) will behave if its internal buffer overflows.singleProducer
- whether the channel will be used by a single producer strand.singleConsumer
- whether the channel will be used by a single consumer strand. Currently primitive channels only support a single consumer, so this argument must be set tofalse
.- Returns:
- The newly created channel
-
newDoubleChannel
public static DoubleChannel newDoubleChannel(int bufferSize, Channels.OverflowPolicy policy)
Creates a new primitivedouble
channel with the given mailbox size andChannels.OverflowPolicy
, with other properties set to their default values. Specifically,singleProducer
will be set tofalse
, whilesingleConsumer
will be set totrue
.- Parameters:
bufferSize
- if positive, the number of messages that the channel can hold in an internal buffer;0
for a transfer channel, i.e. a channel with no internal buffer.-1
for a channel with an unbounded (infinite) buffer.policy
- theChannels.OverflowPolicy
specifying how the channel (if bounded) will behave if its internal buffer overflows.- Returns:
- The newly created channel
- See Also:
newDoubleChannel(int, co.paralleluniverse.strands.channels.Channels.OverflowPolicy, boolean, boolean)
-
newDoubleChannel
public static DoubleChannel newDoubleChannel(int bufferSize)
Creates a new primitivedouble
channel with the given mailbox size with other properties set to their default values. Specifically, theChannels.OverflowPolicy
will be set toBLOCK
,singleProducer
will be set tofalse
, andsingleConsumer
will be set totrue
.- Parameters:
bufferSize
- if positive, the number of messages that the channel can hold in an internal buffer;0
for a transfer channel, i.e. a channel with no internal buffer.-1
for a channel with an unbounded (infinite) buffer.- Returns:
- The newly created channel
- See Also:
newDoubleChannel(int, co.paralleluniverse.strands.channels.Channels.OverflowPolicy, boolean, boolean)
-
isTickerChannel
public static boolean isTickerChannel(ReceivePort<?> channel)
Tests whether a given channel is a ticker channel, namely a channel with a bounded buffer and anoverflow policy
ofDISPLACE
. A ticker channel can be passed to one of thenewTickerConsumerFor
methods.- Parameters:
channel
- the channel- Returns:
true
if the channel is a ticker,false
otherwise
-
newTickerConsumerFor
public static <Message> ReceivePort<Message> newTickerConsumerFor(Channel<Message> channel)
Creates aReceivePort
that can be used to receive messages from a a ticker channel: a channel of bounded capacity and theDISPLACE
overflow policy. Each ticker consumer will yield monotonic messages, namely no message will be received more than once, and the messages will be received in the order they're sent, but if the consumer is too slow, messages could be lost.- Type Parameters:
Message
- the message type- Parameters:
channel
- a channel of bounded capacity and theDISPLACE
overflow policy.- Returns:
- a new
ReceivePort
which provides a view to the supplied ticker channel.
-
newTickerConsumerFor
public static IntReceivePort newTickerConsumerFor(IntChannel channel)
Creates anIntReceivePort
that can be used to receive messages from a a ticker channel: a channel of bounded capacity and theDISPLACE
overflow policy. Each ticker consumer will yield monotonic messages, namely no message will be received more than once, and the messages will be received in the order they're sent, but if the consumer is too slow, messages could be lost.- Parameters:
channel
- anint
channel of bounded capacity and theDISPLACE
overflow policy.- Returns:
- a new
IntReceivePort
which provides a view to the supplied ticker channel.
-
newTickerConsumerFor
public static LongReceivePort newTickerConsumerFor(LongChannel channel)
Creates aLongReceivePort
that can be used to receive messages from a a ticker channel: a channel of bounded capacity and theDISPLACE
overflow policy. Each ticker consumer will yield monotonic messages, namely no message will be received more than once, and the messages will be received in the order they're sent, but if the consumer is too slow, messages could be lost.- Parameters:
channel
- along
channel of bounded capacity and theDISPLACE
overflow policy.- Returns:
- a new
LongReceivePort
which provides a view to the supplied ticker channel.
-
newTickerConsumerFor
public static FloatReceivePort newTickerConsumerFor(FloatChannel channel)
Creates aFloatReceivePort
that can be used to receive messages from a a ticker channel: a channel of bounded capacity and theDISPLACE
overflow policy. Each ticker consumer will yield monotonic messages, namely no message will be received more than once, and the messages will be received in the order they're sent, but if the consumer is too slow, messages could be lost.- Parameters:
channel
- afloat
channel of bounded capacity and theDISPLACE
overflow policy.- Returns:
- a new
FloatReceivePort
which provides a view to the supplied ticker channel.
-
newTickerConsumerFor
public static DoubleReceivePort newTickerConsumerFor(DoubleChannel channel)
Creates aDoubleReceivePort
that can be used to receive messages from a a ticker channel: a channel of bounded capacity and theDISPLACE
overflow policy. Each ticker consumer will yield monotonic messages, namely no message will be received more than once, and the messages will be received in the order they're sent, but if the consumer is too slow, messages could be lost.- Parameters:
channel
- adouble
channel of bounded capacity and theDISPLACE
overflow policy.- Returns:
- a new
DoubleReceivePort
which provides a view to the supplied ticker channel.
-
fiberTransform
public static <S,T> void fiberTransform(FiberFactory fiberFactory, ReceivePort<S> in, SendPort<T> out, SuspendableAction2<? extends ReceivePort<? super S>,? extends SendPort<? extends T>> transformer)
Spawns a fiber that transforms values read from thein
channel and writes values to theout
channel.- Type Parameters:
S
- the message type of the input (source) channel.T
- the message type of the output (target) channel.- Parameters:
fiberFactory
- will be used to create the fiberin
- the input channelout
- the output channeltransformer
- the transforming operation
-
fiberTransform
public static <S,T> void fiberTransform(ReceivePort<S> in, SendPort<T> out, SuspendableAction2<? extends ReceivePort<? super S>,? extends SendPort<? extends T>> transformer)
Spawns a fiber that transforms values read from thein
channel and writes values to theout
channel.When the transformation terminates. the output channel is automatically closed. If the transformation terminates abnormally (throws an exception), the output channel is
closed with that exception
.- Type Parameters:
S
- the message type of the input (source) channel.T
- the message type of the output (target) channel.- Parameters:
in
- the input channelout
- the output channeltransformer
- the transforming operation
-
group
public static <M> ReceivePort<M> group(ReceivePort<? extends M>... channels)
Returns aReceivePort
that receives messages from a set of channels. Messages from all given channels are funneled into the returned channel.- Type Parameters:
M
- the message type of the receive ports- Parameters:
channels
- the receive ports- Returns:
- a
ReceivePort
that receives messages fromchannels
.
-
group
public static <M> ReceivePort<M> group(java.util.Collection<? extends ReceivePort<? extends M>> channels)
Returns aReceivePort
that receives messages from a set of channels. Messages from all given channels are funneled into the returned channel.- Type Parameters:
M
- the message type of the receive ports- Parameters:
channels
- the receive ports- Returns:
- a
ReceivePort
that receives messages fromchannels
.
-
mix
public static <M> Mix<? extends M> mix(ReceivePort<? extends M>... channels)
Returns aMix
that receives messages from a set of channels. Messages from all given channels are funneled into the returned channel.- Type Parameters:
M
- the message type of the receive ports- Parameters:
channels
- the receive ports- Returns:
- a
ReceivePort
that receives messages fromchannels
.
-
mix
public static <M> Mix<? extends M> mix(java.util.Collection<? extends ReceivePort<? extends M>> channels)
Returns aMix
that receives messages from a set of channels. Messages from all given channels are funneled into the returned channel.- Type Parameters:
M
- the message type of the receive ports- Parameters:
channels
- the receive ports- Returns:
- a
ReceivePort
that receives messages fromchannels
.
-
filter
public static <M> ReceivePort<M> filter(ReceivePort<M> channel, com.google.common.base.Predicate<M> pred)
Returns aReceivePort
that filters messages that satisfy a predicate from a given channel. All messages (even those not satisfying the predicate) will be consumed from the original channel; those that don't satisfy the predicate will be silently discarded.The returned
ReceivePort
has the samehashCode
aschannel
and isequal
to it.- Type Parameters:
M
- the message type.- Parameters:
channel
- The channel to filterpred
- the filtering predicate- Returns:
- A
ReceivePort
that will receive all those messages from the original channel which satisfy the predicate (i.e. the predicate returnstrue
).
-
map
public static <S,T> ReceivePort<T> map(ReceivePort<S> channel, com.google.common.base.Function<S,T> f)
Returns aReceivePort
that receives messages that are transformed by a given mapping function from a given channel.The returned
ReceivePort
has the samehashCode
aschannel
and isequal
to it.- Type Parameters:
S
- the message type of the source (given) channel.T
- the message type of the target (returned) channel.- Parameters:
channel
- the channel to transformf
- the mapping function- Returns:
- a
ReceivePort
that returns messages that are the result of applying the mapping function to the messages received on the given channel.
-
reduce
public static <S,T> ReceivePort<T> reduce(ReceivePort<S> channel, co.paralleluniverse.common.util.Function2<T,S,T> f, T init)
Returns aReceivePort
providing messages that are transformed from a given channel by a given reduction function.The returned
ReceivePort
has the samehashCode
aschannel
and isequal
to it.- Type Parameters:
S
- The message type of the source (given) channel.T
- The message type of the target (returned) channel.- Parameters:
channel
- The channel to transform.f
- The reduction function.init
- The initial input to the reduction function.- Returns:
- a
ReceivePort
that returns messages that are the result of applying the reduction function to the messages received on the given channel.
-
mapErrors
public static <T> ReceivePort<T> mapErrors(ReceivePort<T> channel, com.google.common.base.Function<java.lang.Exception,T> f)
Returns aReceivePort
that maps exceptions thrown by the underlying channel (by channel transformations, or as a result ofSendPort.close(Throwable)
) into messages.The returned
ReceivePort
has the samehashCode
aschannel
and isequal
to it.- Type Parameters:
T
- the message type of the target (returned) channel.- Parameters:
channel
- the channel to transformf
- the exception mapping function- Returns:
- a
ReceivePort
that maps exceptions thrown by the given channel
-
flatMap
public static <S,T> ReceivePort<T> flatMap(ReceivePort<S> channel, com.google.common.base.Function<S,ReceivePort<T>> f)
Returns aReceivePort
that receives messages that are transformed by a given flat-mapping function from a given channel. Unlikemap
, the mapping function does not returns a single output message for every input message, but a newReceivePort
. All the returned ports are concatenated into a singleReceivePort
that receives the messages received by all the ports in order.To return a single value the mapping function can make use of
singletonReceivePort
. To return a collection, it can make use oftoReceivePort(Iterable)
. To emit no values, the function can returnemptyReceivePort()
ornull
.The returned
ReceivePort
can only be safely used by a single receiver strand.The returned
ReceivePort
has the samehashCode
aschannel
and isequal
to it.- Type Parameters:
S
- the message type of the source (given) channel.T
- the message type of the target (returned) channel.- Parameters:
channel
- the channel to transformf
- the mapping function- Returns:
- a
ReceivePort
that returns messages that are the result of applying the mapping function to the messages received on the given channel.
-
forEach
public static <T> void forEach(ReceivePort<T> channel, SuspendableAction1<T> action) throws SuspendExecution, java.lang.InterruptedException
Performs the given action on each message received by the given channel. This method returns only after all messages have been consumed and the channel has been closed.- Type Parameters:
T
- the message type- Parameters:
channel
- the channelaction
- the actions- Throws:
java.lang.InterruptedException
SuspendExecution
-
take
public static <T> ReceivePort<T> take(ReceivePort<T> channel, long count)
- Type Parameters:
T
- The message type.- Parameters:
channel
- The channel.count
- The maximum number of messages extracted from the underlying channel.- Returns:
- a
ReceivePort
that can provide at mostcount
messages fromchannel
.
-
zip
public static <M> ReceivePort<M> zip(java.util.List<? extends ReceivePort<?>> cs, com.google.common.base.Function<java.lang.Object[],M> f)
Returns aReceivePort
that combines each vector of messages from a list of channels into a single combined message.- Type Parameters:
M
- The type of the combined message- Parameters:
f
- The combining functioncs
- A vector of channels- Returns:
- A zipping
ReceivePort
-
zip
public static <M,S1,S2> ReceivePort<M> zip(ReceivePort<S1> c1, ReceivePort<S2> c2, co.paralleluniverse.common.util.Function2<S1,S2,M> f)
Returns aReceivePort
that combines each vector of messages from a vector of channels into a single combined message.- Type Parameters:
M
- The type of the combined messageS1
- The message type of the first input portS2
- The message type of the second input port- Parameters:
f
- The combining functionc1
- The first input portc2
- The second input port- Returns:
- A zipping
ReceivePort
-
zip
public static <M,S1,S2,S3> ReceivePort<M> zip(ReceivePort<S1> c1, ReceivePort<S2> c2, ReceivePort<S3> c3, co.paralleluniverse.common.util.Function3<S1,S2,S3,M> f)
Returns aReceivePort
that combines each vector of messages from a vector of channels into a single combined message.- Type Parameters:
M
- The type of the combined messageS1
- The message type of the first input portS2
- The message type of the second input portS3
- The message type of the third input port- Parameters:
f
- The combining functionc1
- The first input portc2
- The second input portc3
- The third input port- Returns:
- A zipping
ReceivePort
-
zip
public static <M,S1,S2,S3,S4> ReceivePort<M> zip(ReceivePort<S1> c1, ReceivePort<S2> c2, ReceivePort<S3> c3, ReceivePort<S4> c4, co.paralleluniverse.common.util.Function4<S1,S2,S3,S4,M> f)
Returns aReceivePort
that combines each vector of messages from a vector of channels into a single combined message.- Type Parameters:
M
- The type of the combined messageS1
- The message type of the first input portS2
- The message type of the second input portS3
- The message type of the third input portS4
- The message type of the fourth input port- Parameters:
f
- The combining functionc1
- The first input portc2
- The second input portc3
- The third input portc4
- The fourth input port- Returns:
- A zipping
ReceivePort
-
zip
public static <M,S1,S2,S3,S4,S5> ReceivePort<M> zip(ReceivePort<S1> c1, ReceivePort<S2> c2, ReceivePort<S3> c3, ReceivePort<S4> c4, ReceivePort<S5> c5, co.paralleluniverse.common.util.Function5<S1,S2,S3,S4,S5,M> f)
Returns aReceivePort
that combines each vector of messages from a vector of channels into a single combined message.- Type Parameters:
M
- The type of the combined messageS1
- The message type of the first input portS2
- The message type of the second input portS3
- The message type of the third input portS4
- The message type of the fourth input portS5
- The message type of the fifth input port- Parameters:
f
- The combining functionc1
- The first input portc2
- The second input portc3
- The third input portc4
- The fourth input portc5
- The fifth input port- Returns:
- A zipping
ReceivePort
-
transform
public static <M> TransformingReceivePort<M> transform(ReceivePort<M> channel)
Returns aTransformingReceivePort
wrapping the given channel, which may be used for functional transformations.- Type Parameters:
M
- the message type- Parameters:
channel
- the channel to transform- Returns:
- the transformed
ReceivePort
-
filterSend
public static <M> SendPort<M> filterSend(SendPort<M> channel, com.google.common.base.Predicate<M> pred)
Returns aSendPort
that filters messages that satisfy a predicate before sending to a given channel. Messages that don't satisfy the predicate will be silently discarded when sent.The returned
SendPort
has the samehashCode
aschannel
and isequal
to it.- Type Parameters:
M
- the message type.- Parameters:
channel
- The channel to filterpred
- the filtering predicate- Returns:
- A
SendPort
that will send only those messages which satisfy the predicate (i.e. the predicate returnstrue
) to the given channel.
-
mapSend
public static <S,T> SendPort<S> mapSend(SendPort<T> channel, com.google.common.base.Function<S,T> f)
Returns aSendPort
that transforms messages by applying a given mapping function before sending them to a given channel.The returned
SendPort
has the samehashCode
aschannel
and isequal
to it.- Type Parameters:
S
- the message type of the source (returned) channel.T
- the message type of the target (given) channel.- Parameters:
channel
- the channel to transformf
- the mapping function- Returns:
- a
SendPort
that passes messages to the given channel after transforming them by applying the mapping function.
-
reduceSend
public static <S,T> SendPort<S> reduceSend(SendPort<T> channel, co.paralleluniverse.common.util.Function2<T,S,T> f, T init)
Returns aSendPort
accepting messages that are transformed by a reduction function.The returned
SendPort
has the samehashCode
aschannel
and isequal
to it.- Type Parameters:
S
- The message type of the source (returned) channel.T
- The message type of the target (given) channel.- Parameters:
channel
- The channel to transform.f
- The reduction function.init
- The initial input to the reduction function.- Returns:
- a
ReceivePort
that returns messages that are the result of applying the reduction function to the messages received on the given channel.
-
flatMapSend
public static <S,T> SendPort<S> flatMapSend(FiberFactory fiberFactory, Channel<S> pipe, SendPort<T> channel, com.google.common.base.Function<S,ReceivePort<T>> f)
Returns aSendPort
that sends messages that are transformed by a given flat-mapping function into a given channel. Unlikemap
, the mapping function does not returns a single output message for every input message, but a newReceivePort
. All the returned ports are concatenated and sent to the channel.To return a single value the mapping function can make use of
singletonReceivePort
. To return a collection, it can make use oftoReceivePort(Iterable)
. To emit no values, the function can returnemptyReceivePort()
ornull
.If multiple producers send messages into the channel, the messages from the
ReceivePort
s returned by the mapping function may be interleaved with other messages.The returned
SendPort
has the samehashCode
aschannel
and isequal
to it.- Type Parameters:
S
- the message type of the source (given) channel.T
- the message type of the target (returned) channel.- Parameters:
pipe
- an intermediate channel used in the flat-mapping operation. Messages are first sent to this channel before being transformed.channel
- the channel to transformf
- the mapping function- Returns:
- a
ReceivePort
that returns messages that are the result of applying the mapping function to the messages received on the given channel.
-
flatMapSend
public static <S,T> SendPort<S> flatMapSend(Channel<S> pipe, SendPort<T> channel, com.google.common.base.Function<S,ReceivePort<T>> f)
-
transformSend
public static <M> TransformingSendPort<M> transformSend(SendPort<M> channel)
Returns aTransformingSendPort
wrapping the given channel, which may be used for functional transformations.- Type Parameters:
M
- the message type- Parameters:
channel
- the channel to transform- Returns:
- the transformed
SendPort
-
emptyReceivePort
public static <T> ReceivePort<T> emptyReceivePort()
Returns an emptyReceivePort
. The port is closed and receives no messages;
-
singletonReceivePort
public static <T> ReceivePort<T> singletonReceivePort(T object)
Returns a newly createdReceivePort
that receives a single message: the object given to the function.- Type Parameters:
T
-- Parameters:
object
- the single object that will be returned by theReceivePort
.
-
toReceivePort
public static <T> ReceivePort<T> toReceivePort(java.util.Iterator<T> iterator)
Returns a newly createdReceivePort
that receives all the elements iterated by the iterator.- Type Parameters:
T
-- Parameters:
iterator
- the iterator to transform into aReceivePort
.
-
toReceivePort
public static <T> ReceivePort<T> toReceivePort(java.lang.Iterable<T> iterable)
Returns a newly createdReceivePort
that receives all the elements iterated by the iterable.- Type Parameters:
T
-- Parameters:
iterable
- the iterable to transform into aReceivePort
.
-
-