Class Channels
- java.lang.Object
-
- co.paralleluniverse.strands.channels.Channels
-
public final class Channels extends java.lang.ObjectA utility class for creating and manipulating channels.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static classChannels.OverflowPolicyDetermines how a channel behaves when its internal buffer (if it has one) overflows.
-
Method Summary
All Methods Static Methods Concrete Methods Modifier and Type Method Description static <T> ReceivePort<T>emptyReceivePort()Returns an emptyReceivePort.static <S,T>
voidfiberTransform(FiberFactory fiberFactory, ReceivePort<S> in, SendPort<T> out, SuspendableAction2<? extends ReceivePort<? super S>,? extends SendPort<? extends T>> transformer)Spawns a fiber that transforms values read from theinchannel and writes values to theoutchannel.static <S,T>
voidfiberTransform(ReceivePort<S> in, SendPort<T> out, SuspendableAction2<? extends ReceivePort<? super S>,? extends SendPort<? extends T>> transformer)Spawns a fiber that transforms values read from theinchannel and writes values to theoutchannel.static <M> ReceivePort<M>filter(ReceivePort<M> channel, com.google.common.base.Predicate<M> pred)Returns aReceivePortthat filters messages that satisfy a predicate from a given channel.static <M> SendPort<M>filterSend(SendPort<M> channel, com.google.common.base.Predicate<M> pred)Returns aSendPortthat filters messages that satisfy a predicate before sending to a given channel.static <S,T>
ReceivePort<T>flatMap(ReceivePort<S> channel, com.google.common.base.Function<S,ReceivePort<T>> f)Returns aReceivePortthat receives messages that are transformed by a given flat-mapping function from a given channel.static <S,T>
SendPort<S>flatMapSend(FiberFactory fiberFactory, Channel<S> pipe, SendPort<T> channel, com.google.common.base.Function<S,ReceivePort<T>> f)Returns aSendPortthat sends messages that are transformed by a given flat-mapping function into a given channel.static <S,T>
SendPort<S>flatMapSend(Channel<S> pipe, SendPort<T> channel, com.google.common.base.Function<S,ReceivePort<T>> f)static <T> voidforEach(ReceivePort<T> channel, SuspendableAction1<T> action)Performs the given action on each message received by the given channel.static <M> ReceivePort<M>group(ReceivePort<? extends M>... channels)Returns aReceivePortthat receives messages from a set of channels.static <M> ReceivePort<M>group(java.util.Collection<? extends ReceivePort<? extends M>> channels)Returns aReceivePortthat receives messages from a set of channels.static booleanisTickerChannel(ReceivePort<?> channel)Tests whether a given channel is a ticker channel, namely a channel with a bounded buffer and anoverflow policyofDISPLACE.static <S,T>
ReceivePort<T>map(ReceivePort<S> channel, com.google.common.base.Function<S,T> f)Returns aReceivePortthat receives messages that are transformed by a given mapping function from a given channel.static <T> ReceivePort<T>mapErrors(ReceivePort<T> channel, com.google.common.base.Function<java.lang.Exception,T> f)Returns aReceivePortthat maps exceptions thrown by the underlying channel (by channel transformations, or as a result ofSendPort.close(Throwable)) into messages.static <S,T>
SendPort<S>mapSend(SendPort<T> channel, com.google.common.base.Function<S,T> f)Returns aSendPortthat transforms messages by applying a given mapping function before sending them to a given channel.static <M> Mix<? extends M>mix(ReceivePort<? extends M>... channels)Returns aMixthat receives messages from a set of channels.static <M> Mix<? extends M>mix(java.util.Collection<? extends ReceivePort<? extends M>> channels)Returns aMixthat receives messages from a set of channels.static <Message> Channel<Message>newChannel(int bufferSize)Creates a new channel with the given mailbox size with other properties set to their default values.static <Message> Channel<Message>newChannel(int bufferSize, Channels.OverflowPolicy policy)Creates a new channel with the given mailbox size andChannels.OverflowPolicy, with other properties set to their default values.static <Message> Channel<Message>newChannel(int bufferSize, Channels.OverflowPolicy policy, boolean singleProducer, boolean singleConsumer)Creates a new channel with the given properties.static DoubleChannelnewDoubleChannel(int bufferSize)Creates a new primitivedoublechannel with the given mailbox size with other properties set to their default values.static DoubleChannelnewDoubleChannel(int bufferSize, Channels.OverflowPolicy policy)Creates a new primitivedoublechannel with the given mailbox size andChannels.OverflowPolicy, with other properties set to their default values.static DoubleChannelnewDoubleChannel(int bufferSize, Channels.OverflowPolicy policy, boolean singleProducer, boolean singleConsumer)Creates a new primitivedoublechannel with the given properties.static FloatChannelnewFloatChannel(int bufferSize)Creates a new primitivefloatchannel with the given mailbox size with other properties set to their default values.static FloatChannelnewFloatChannel(int bufferSize, Channels.OverflowPolicy policy)Creates a new primitivefloatchannel with the given mailbox size andChannels.OverflowPolicy, with other properties set to their default values.static FloatChannelnewFloatChannel(int bufferSize, Channels.OverflowPolicy policy, boolean singleProducer, boolean singleConsumer)Creates a new primitivefloatchannel with the given properties.static IntChannelnewIntChannel(int bufferSize)Creates a new primitiveintchannel with the given mailbox size with other properties set to their default values.static IntChannelnewIntChannel(int bufferSize, Channels.OverflowPolicy policy)Creates a new primitiveintchannel with the given mailbox size andChannels.OverflowPolicy, with other properties set to their default values.static IntChannelnewIntChannel(int bufferSize, Channels.OverflowPolicy policy, boolean singleProducer, boolean singleConsumer)Creates a new primitiveintchannel with the given properties.static LongChannelnewLongChannel(int bufferSize)Creates a new primitivelongchannel with the given mailbox size with other properties set to their default values.static LongChannelnewLongChannel(int bufferSize, Channels.OverflowPolicy policy)Creates a new primitivelongchannel with the given mailbox size andChannels.OverflowPolicy, with other properties set to their default values.static LongChannelnewLongChannel(int bufferSize, Channels.OverflowPolicy policy, boolean singleProducer, boolean singleConsumer)Creates a new primitivelongchannel with the given properties.static <Message> ReceivePort<Message>newTickerConsumerFor(Channel<Message> channel)Creates aReceivePortthat can be used to receive messages from a a ticker channel: a channel of bounded capacity and theDISPLACEoverflow policy.static DoubleReceivePortnewTickerConsumerFor(DoubleChannel channel)Creates aDoubleReceivePortthat can be used to receive messages from a a ticker channel: a channel of bounded capacity and theDISPLACEoverflow policy.static FloatReceivePortnewTickerConsumerFor(FloatChannel channel)Creates aFloatReceivePortthat can be used to receive messages from a a ticker channel: a channel of bounded capacity and theDISPLACEoverflow policy.static IntReceivePortnewTickerConsumerFor(IntChannel channel)Creates anIntReceivePortthat can be used to receive messages from a a ticker channel: a channel of bounded capacity and theDISPLACEoverflow policy.static LongReceivePortnewTickerConsumerFor(LongChannel channel)Creates aLongReceivePortthat can be used to receive messages from a a ticker channel: a channel of bounded capacity and theDISPLACEoverflow policy.static <S,T>
ReceivePort<T>reduce(ReceivePort<S> channel, co.paralleluniverse.common.util.Function2<T,S,T> f, T init)Returns aReceivePortproviding messages that are transformed from a given channel by a given reduction function.static <S,T>
SendPort<S>reduceSend(SendPort<T> channel, co.paralleluniverse.common.util.Function2<T,S,T> f, T init)Returns aSendPortaccepting messages that are transformed by a reduction function.static <T> ReceivePort<T>singletonReceivePort(T object)Returns a newly createdReceivePortthat receives a single message: the object given to the function.static <T> ReceivePort<T>take(ReceivePort<T> channel, long count)static <T> ReceivePort<T>toReceivePort(java.lang.Iterable<T> iterable)Returns a newly createdReceivePortthat receives all the elements iterated by the iterable.static <T> ReceivePort<T>toReceivePort(java.util.Iterator<T> iterator)Returns a newly createdReceivePortthat receives all the elements iterated by the iterator.static <M> TransformingReceivePort<M>transform(ReceivePort<M> channel)Returns aTransformingReceivePortwrapping the given channel, which may be used for functional transformations.static <M> TransformingSendPort<M>transformSend(SendPort<M> channel)Returns aTransformingSendPortwrapping the given channel, which may be used for functional transformations.static <M,S1,S2>
ReceivePort<M>zip(ReceivePort<S1> c1, ReceivePort<S2> c2, co.paralleluniverse.common.util.Function2<S1,S2,M> f)Returns aReceivePortthat combines each vector of messages from a vector of channels into a single combined message.static <M,S1,S2,S3>
ReceivePort<M>zip(ReceivePort<S1> c1, ReceivePort<S2> c2, ReceivePort<S3> c3, co.paralleluniverse.common.util.Function3<S1,S2,S3,M> f)Returns aReceivePortthat combines each vector of messages from a vector of channels into a single combined message.static <M,S1,S2,S3,S4>
ReceivePort<M>zip(ReceivePort<S1> c1, ReceivePort<S2> c2, ReceivePort<S3> c3, ReceivePort<S4> c4, co.paralleluniverse.common.util.Function4<S1,S2,S3,S4,M> f)Returns aReceivePortthat combines each vector of messages from a vector of channels into a single combined message.static <M,S1,S2,S3,S4,S5>
ReceivePort<M>zip(ReceivePort<S1> c1, ReceivePort<S2> c2, ReceivePort<S3> c3, ReceivePort<S4> c4, ReceivePort<S5> c5, co.paralleluniverse.common.util.Function5<S1,S2,S3,S4,S5,M> f)Returns aReceivePortthat combines each vector of messages from a vector of channels into a single combined message.static <M> ReceivePort<M>zip(java.util.List<? extends ReceivePort<?>> cs, com.google.common.base.Function<java.lang.Object[],M> f)Returns aReceivePortthat combines each vector of messages from a list of channels into a single combined message.
-
-
-
Method Detail
-
newChannel
public static <Message> Channel<Message> newChannel(int bufferSize, Channels.OverflowPolicy policy, boolean singleProducer, boolean singleConsumer)
Creates a new channel with the given properties.Some combinations of properties are unsupported, and will throw an
An unbounded channel ignores its overflow policy as it never overflows.IllegalArgumentExceptionif requested:- Type Parameters:
Message- the type of messages that can be sent to this channel.- Parameters:
bufferSize- if positive, the number of messages that the channel can hold in an internal buffer;0for a transfer channel, i.e. a channel with no internal buffer.-1for a channel with an unbounded (infinite) buffer.policy- theChannels.OverflowPolicyspecifying how the channel (if bounded) will behave if its internal buffer overflows.singleProducer- whether the channel will be used by a single producer strand.singleConsumer- whether the channel will be used by a single consumer strand.- Returns:
- The newly created channel
-
newChannel
public static <Message> Channel<Message> newChannel(int bufferSize, Channels.OverflowPolicy policy)
Creates a new channel with the given mailbox size andChannels.OverflowPolicy, with other properties set to their default values. Specifically,singleProducerwill be set tofalse, whilesingleConsumerwill be set totrue.- Type Parameters:
Message- the type of messages that can be sent to this channel.- Parameters:
bufferSize- if positive, the number of messages that the channel can hold in an internal buffer;0for a transfer channel, i.e. a channel with no internal buffer.-1for a channel with an unbounded (infinite) buffer.policy- theChannels.OverflowPolicyspecifying how the channel (if bounded) will behave if its internal buffer overflows.- Returns:
- The newly created channel
- See Also:
newChannel(int, co.paralleluniverse.strands.channels.Channels.OverflowPolicy, boolean, boolean)
-
newChannel
public static <Message> Channel<Message> newChannel(int bufferSize)
Creates a new channel with the given mailbox size with other properties set to their default values. Specifically, theChannels.OverflowPolicywill be set toBLOCK,singleProducerwill be set tofalse, andsingleConsumerwill be set totrue.- Type Parameters:
Message- the type of messages that can be sent to this channel.- Parameters:
bufferSize- if positive, the number of messages that the channel can hold in an internal buffer;0for a transfer channel, i.e. a channel with no internal buffer.-1for a channel with an unbounded (infinite) buffer.- Returns:
- The newly created channel
- See Also:
newChannel(int, co.paralleluniverse.strands.channels.Channels.OverflowPolicy, boolean, boolean)
-
newIntChannel
public static IntChannel newIntChannel(int bufferSize, Channels.OverflowPolicy policy, boolean singleProducer, boolean singleConsumer)
Creates a new primitiveintchannel with the given properties.Some combinations of properties are unsupported, and will throw an
An unbounded channel ignores its overflow policy as it never overflows.IllegalArgumentExceptionif requested:- Parameters:
bufferSize- if positive, the number of messages that the channel can hold in an internal buffer;0for a transfer channel, i.e. a channel with no internal buffer.-1for a channel with an unbounded (infinite) buffer.policy- theChannels.OverflowPolicyspecifying how the channel (if bounded) will behave if its internal buffer overflows.singleProducer- whether the channel will be used by a single producer strand.singleConsumer- whether the channel will be used by a single consumer strand. Currently primitive channels only support a single consumer, so this argument must be set tofalse.- Returns:
- The newly created channel
-
newIntChannel
public static IntChannel newIntChannel(int bufferSize, Channels.OverflowPolicy policy)
Creates a new primitiveintchannel with the given mailbox size andChannels.OverflowPolicy, with other properties set to their default values. Specifically,singleProducerwill be set tofalse, whilesingleConsumerwill be set totrue.- Parameters:
bufferSize- if positive, the number of messages that the channel can hold in an internal buffer;0for a transfer channel, i.e. a channel with no internal buffer.-1for a channel with an unbounded (infinite) buffer.policy- theChannels.OverflowPolicyspecifying how the channel (if bounded) will behave if its internal buffer overflows.- Returns:
- The newly created channel
- See Also:
newIntChannel(int, co.paralleluniverse.strands.channels.Channels.OverflowPolicy, boolean, boolean)
-
newIntChannel
public static IntChannel newIntChannel(int bufferSize)
Creates a new primitiveintchannel with the given mailbox size with other properties set to their default values. Specifically, theChannels.OverflowPolicywill be set toBLOCK,singleProducerwill be set tofalse, andsingleConsumerwill be set totrue.- Parameters:
bufferSize- if positive, the number of messages that the channel can hold in an internal buffer;0for a transfer channel, i.e. a channel with no internal buffer.-1for a channel with an unbounded (infinite) buffer.- Returns:
- The newly created channel
- See Also:
newIntChannel(int, co.paralleluniverse.strands.channels.Channels.OverflowPolicy, boolean, boolean)
-
newLongChannel
public static LongChannel newLongChannel(int bufferSize, Channels.OverflowPolicy policy, boolean singleProducer, boolean singleConsumer)
Creates a new primitivelongchannel with the given properties.Some combinations of properties are unsupported, and will throw an
An unbounded channel ignores its overflow policy as it never overflows.IllegalArgumentExceptionif requested:- Parameters:
bufferSize- if positive, the number of messages that the channel can hold in an internal buffer;0for a transfer channel, i.e. a channel with no internal buffer.-1for a channel with an unbounded (infinite) buffer.policy- theChannels.OverflowPolicyspecifying how the channel (if bounded) will behave if its internal buffer overflows.singleProducer- whether the channel will be used by a single producer strand.singleConsumer- whether the channel will be used by a single consumer strand. Currently primitive channels only support a single consumer, so this argument must be set tofalse.- Returns:
- The newly created channel
-
newLongChannel
public static LongChannel newLongChannel(int bufferSize, Channels.OverflowPolicy policy)
Creates a new primitivelongchannel with the given mailbox size andChannels.OverflowPolicy, with other properties set to their default values. Specifically,singleProducerwill be set tofalse, whilesingleConsumerwill be set totrue.- Parameters:
bufferSize- if positive, the number of messages that the channel can hold in an internal buffer;0for a transfer channel, i.e. a channel with no internal buffer.-1for a channel with an unbounded (infinite) buffer.policy- theChannels.OverflowPolicyspecifying how the channel (if bounded) will behave if its internal buffer overflows.- Returns:
- The newly created channel
- See Also:
newLongChannel(int, co.paralleluniverse.strands.channels.Channels.OverflowPolicy, boolean, boolean)
-
newLongChannel
public static LongChannel newLongChannel(int bufferSize)
Creates a new primitivelongchannel with the given mailbox size with other properties set to their default values. Specifically, theChannels.OverflowPolicywill be set toBLOCK,singleProducerwill be set tofalse, andsingleConsumerwill be set totrue.- Parameters:
bufferSize- if positive, the number of messages that the channel can hold in an internal buffer;0for a transfer channel, i.e. a channel with no internal buffer.-1for a channel with an unbounded (infinite) buffer.- Returns:
- The newly created channel
- See Also:
newLongChannel(int, co.paralleluniverse.strands.channels.Channels.OverflowPolicy, boolean, boolean)
-
newFloatChannel
public static FloatChannel newFloatChannel(int bufferSize, Channels.OverflowPolicy policy, boolean singleProducer, boolean singleConsumer)
Creates a new primitivefloatchannel with the given properties.Some combinations of properties are unsupported, and will throw an
An unbounded channel ignores its overflow policy as it never overflows.IllegalArgumentExceptionif requested:- Parameters:
bufferSize- if positive, the number of messages that the channel can hold in an internal buffer;0for a transfer channel, i.e. a channel with no internal buffer.-1for a channel with an unbounded (infinite) buffer.policy- theChannels.OverflowPolicyspecifying how the channel (if bounded) will behave if its internal buffer overflows.singleProducer- whether the channel will be used by a single producer strand.singleConsumer- whether the channel will be used by a single consumer strand. Currently primitive channels only support a single consumer, so this argument must be set tofalse.- Returns:
- The newly created channel
-
newFloatChannel
public static FloatChannel newFloatChannel(int bufferSize, Channels.OverflowPolicy policy)
Creates a new primitivefloatchannel with the given mailbox size andChannels.OverflowPolicy, with other properties set to their default values. Specifically,singleProducerwill be set tofalse, whilesingleConsumerwill be set totrue.- Parameters:
bufferSize- if positive, the number of messages that the channel can hold in an internal buffer;0for a transfer channel, i.e. a channel with no internal buffer.-1for a channel with an unbounded (infinite) buffer.policy- theChannels.OverflowPolicyspecifying how the channel (if bounded) will behave if its internal buffer overflows.- Returns:
- The newly created channel
- See Also:
newFloatChannel(int, co.paralleluniverse.strands.channels.Channels.OverflowPolicy, boolean, boolean)
-
newFloatChannel
public static FloatChannel newFloatChannel(int bufferSize)
Creates a new primitivefloatchannel with the given mailbox size with other properties set to their default values. Specifically, theChannels.OverflowPolicywill be set toBLOCK,singleProducerwill be set tofalse, andsingleConsumerwill be set totrue.- Parameters:
bufferSize- if positive, the number of messages that the channel can hold in an internal buffer;0for a transfer channel, i.e. a channel with no internal buffer.-1for a channel with an unbounded (infinite) buffer.- Returns:
- The newly created channel
- See Also:
newFloatChannel(int, co.paralleluniverse.strands.channels.Channels.OverflowPolicy, boolean, boolean)
-
newDoubleChannel
public static DoubleChannel newDoubleChannel(int bufferSize, Channels.OverflowPolicy policy, boolean singleProducer, boolean singleConsumer)
Creates a new primitivedoublechannel with the given properties.Some combinations of properties are unsupported, and will throw an
An unbounded channel ignores its overflow policy as it never overflows.IllegalArgumentExceptionif requested:- Parameters:
bufferSize- if positive, the number of messages that the channel can hold in an internal buffer;0for a transfer channel, i.e. a channel with no internal buffer.-1for a channel with an unbounded (infinite) buffer.policy- theChannels.OverflowPolicyspecifying how the channel (if bounded) will behave if its internal buffer overflows.singleProducer- whether the channel will be used by a single producer strand.singleConsumer- whether the channel will be used by a single consumer strand. Currently primitive channels only support a single consumer, so this argument must be set tofalse.- Returns:
- The newly created channel
-
newDoubleChannel
public static DoubleChannel newDoubleChannel(int bufferSize, Channels.OverflowPolicy policy)
Creates a new primitivedoublechannel with the given mailbox size andChannels.OverflowPolicy, with other properties set to their default values. Specifically,singleProducerwill be set tofalse, whilesingleConsumerwill be set totrue.- Parameters:
bufferSize- if positive, the number of messages that the channel can hold in an internal buffer;0for a transfer channel, i.e. a channel with no internal buffer.-1for a channel with an unbounded (infinite) buffer.policy- theChannels.OverflowPolicyspecifying how the channel (if bounded) will behave if its internal buffer overflows.- Returns:
- The newly created channel
- See Also:
newDoubleChannel(int, co.paralleluniverse.strands.channels.Channels.OverflowPolicy, boolean, boolean)
-
newDoubleChannel
public static DoubleChannel newDoubleChannel(int bufferSize)
Creates a new primitivedoublechannel with the given mailbox size with other properties set to their default values. Specifically, theChannels.OverflowPolicywill be set toBLOCK,singleProducerwill be set tofalse, andsingleConsumerwill be set totrue.- Parameters:
bufferSize- if positive, the number of messages that the channel can hold in an internal buffer;0for a transfer channel, i.e. a channel with no internal buffer.-1for a channel with an unbounded (infinite) buffer.- Returns:
- The newly created channel
- See Also:
newDoubleChannel(int, co.paralleluniverse.strands.channels.Channels.OverflowPolicy, boolean, boolean)
-
isTickerChannel
public static boolean isTickerChannel(ReceivePort<?> channel)
Tests whether a given channel is a ticker channel, namely a channel with a bounded buffer and anoverflow policyofDISPLACE. A ticker channel can be passed to one of thenewTickerConsumerFormethods.- Parameters:
channel- the channel- Returns:
trueif the channel is a ticker,falseotherwise
-
newTickerConsumerFor
public static <Message> ReceivePort<Message> newTickerConsumerFor(Channel<Message> channel)
Creates aReceivePortthat can be used to receive messages from a a ticker channel: a channel of bounded capacity and theDISPLACEoverflow policy. Each ticker consumer will yield monotonic messages, namely no message will be received more than once, and the messages will be received in the order they're sent, but if the consumer is too slow, messages could be lost.- Type Parameters:
Message- the message type- Parameters:
channel- a channel of bounded capacity and theDISPLACEoverflow policy.- Returns:
- a new
ReceivePortwhich provides a view to the supplied ticker channel.
-
newTickerConsumerFor
public static IntReceivePort newTickerConsumerFor(IntChannel channel)
Creates anIntReceivePortthat can be used to receive messages from a a ticker channel: a channel of bounded capacity and theDISPLACEoverflow policy. Each ticker consumer will yield monotonic messages, namely no message will be received more than once, and the messages will be received in the order they're sent, but if the consumer is too slow, messages could be lost.- Parameters:
channel- anintchannel of bounded capacity and theDISPLACEoverflow policy.- Returns:
- a new
IntReceivePortwhich provides a view to the supplied ticker channel.
-
newTickerConsumerFor
public static LongReceivePort newTickerConsumerFor(LongChannel channel)
Creates aLongReceivePortthat can be used to receive messages from a a ticker channel: a channel of bounded capacity and theDISPLACEoverflow policy. Each ticker consumer will yield monotonic messages, namely no message will be received more than once, and the messages will be received in the order they're sent, but if the consumer is too slow, messages could be lost.- Parameters:
channel- alongchannel of bounded capacity and theDISPLACEoverflow policy.- Returns:
- a new
LongReceivePortwhich provides a view to the supplied ticker channel.
-
newTickerConsumerFor
public static FloatReceivePort newTickerConsumerFor(FloatChannel channel)
Creates aFloatReceivePortthat can be used to receive messages from a a ticker channel: a channel of bounded capacity and theDISPLACEoverflow policy. Each ticker consumer will yield monotonic messages, namely no message will be received more than once, and the messages will be received in the order they're sent, but if the consumer is too slow, messages could be lost.- Parameters:
channel- afloatchannel of bounded capacity and theDISPLACEoverflow policy.- Returns:
- a new
FloatReceivePortwhich provides a view to the supplied ticker channel.
-
newTickerConsumerFor
public static DoubleReceivePort newTickerConsumerFor(DoubleChannel channel)
Creates aDoubleReceivePortthat can be used to receive messages from a a ticker channel: a channel of bounded capacity and theDISPLACEoverflow policy. Each ticker consumer will yield monotonic messages, namely no message will be received more than once, and the messages will be received in the order they're sent, but if the consumer is too slow, messages could be lost.- Parameters:
channel- adoublechannel of bounded capacity and theDISPLACEoverflow policy.- Returns:
- a new
DoubleReceivePortwhich provides a view to the supplied ticker channel.
-
fiberTransform
public static <S,T> void fiberTransform(FiberFactory fiberFactory, ReceivePort<S> in, SendPort<T> out, SuspendableAction2<? extends ReceivePort<? super S>,? extends SendPort<? extends T>> transformer)
Spawns a fiber that transforms values read from theinchannel and writes values to theoutchannel.- Type Parameters:
S- the message type of the input (source) channel.T- the message type of the output (target) channel.- Parameters:
fiberFactory- will be used to create the fiberin- the input channelout- the output channeltransformer- the transforming operation
-
fiberTransform
public static <S,T> void fiberTransform(ReceivePort<S> in, SendPort<T> out, SuspendableAction2<? extends ReceivePort<? super S>,? extends SendPort<? extends T>> transformer)
Spawns a fiber that transforms values read from theinchannel and writes values to theoutchannel.When the transformation terminates. the output channel is automatically closed. If the transformation terminates abnormally (throws an exception), the output channel is
closed with that exception.- Type Parameters:
S- the message type of the input (source) channel.T- the message type of the output (target) channel.- Parameters:
in- the input channelout- the output channeltransformer- the transforming operation
-
group
public static <M> ReceivePort<M> group(ReceivePort<? extends M>... channels)
Returns aReceivePortthat receives messages from a set of channels. Messages from all given channels are funneled into the returned channel.- Type Parameters:
M- the message type of the receive ports- Parameters:
channels- the receive ports- Returns:
- a
ReceivePortthat receives messages fromchannels.
-
group
public static <M> ReceivePort<M> group(java.util.Collection<? extends ReceivePort<? extends M>> channels)
Returns aReceivePortthat receives messages from a set of channels. Messages from all given channels are funneled into the returned channel.- Type Parameters:
M- the message type of the receive ports- Parameters:
channels- the receive ports- Returns:
- a
ReceivePortthat receives messages fromchannels.
-
mix
public static <M> Mix<? extends M> mix(ReceivePort<? extends M>... channels)
Returns aMixthat receives messages from a set of channels. Messages from all given channels are funneled into the returned channel.- Type Parameters:
M- the message type of the receive ports- Parameters:
channels- the receive ports- Returns:
- a
ReceivePortthat receives messages fromchannels.
-
mix
public static <M> Mix<? extends M> mix(java.util.Collection<? extends ReceivePort<? extends M>> channels)
Returns aMixthat receives messages from a set of channels. Messages from all given channels are funneled into the returned channel.- Type Parameters:
M- the message type of the receive ports- Parameters:
channels- the receive ports- Returns:
- a
ReceivePortthat receives messages fromchannels.
-
filter
public static <M> ReceivePort<M> filter(ReceivePort<M> channel, com.google.common.base.Predicate<M> pred)
Returns aReceivePortthat filters messages that satisfy a predicate from a given channel. All messages (even those not satisfying the predicate) will be consumed from the original channel; those that don't satisfy the predicate will be silently discarded.The returned
ReceivePorthas the samehashCodeaschanneland isequalto it.- Type Parameters:
M- the message type.- Parameters:
channel- The channel to filterpred- the filtering predicate- Returns:
- A
ReceivePortthat will receive all those messages from the original channel which satisfy the predicate (i.e. the predicate returnstrue).
-
map
public static <S,T> ReceivePort<T> map(ReceivePort<S> channel, com.google.common.base.Function<S,T> f)
Returns aReceivePortthat receives messages that are transformed by a given mapping function from a given channel.The returned
ReceivePorthas the samehashCodeaschanneland isequalto it.- Type Parameters:
S- the message type of the source (given) channel.T- the message type of the target (returned) channel.- Parameters:
channel- the channel to transformf- the mapping function- Returns:
- a
ReceivePortthat returns messages that are the result of applying the mapping function to the messages received on the given channel.
-
reduce
public static <S,T> ReceivePort<T> reduce(ReceivePort<S> channel, co.paralleluniverse.common.util.Function2<T,S,T> f, T init)
Returns aReceivePortproviding messages that are transformed from a given channel by a given reduction function.The returned
ReceivePorthas the samehashCodeaschanneland isequalto it.- Type Parameters:
S- The message type of the source (given) channel.T- The message type of the target (returned) channel.- Parameters:
channel- The channel to transform.f- The reduction function.init- The initial input to the reduction function.- Returns:
- a
ReceivePortthat returns messages that are the result of applying the reduction function to the messages received on the given channel.
-
mapErrors
public static <T> ReceivePort<T> mapErrors(ReceivePort<T> channel, com.google.common.base.Function<java.lang.Exception,T> f)
Returns aReceivePortthat maps exceptions thrown by the underlying channel (by channel transformations, or as a result ofSendPort.close(Throwable)) into messages.The returned
ReceivePorthas the samehashCodeaschanneland isequalto it.- Type Parameters:
T- the message type of the target (returned) channel.- Parameters:
channel- the channel to transformf- the exception mapping function- Returns:
- a
ReceivePortthat maps exceptions thrown by the given channel
-
flatMap
public static <S,T> ReceivePort<T> flatMap(ReceivePort<S> channel, com.google.common.base.Function<S,ReceivePort<T>> f)
Returns aReceivePortthat receives messages that are transformed by a given flat-mapping function from a given channel. Unlikemap, the mapping function does not returns a single output message for every input message, but a newReceivePort. All the returned ports are concatenated into a singleReceivePortthat receives the messages received by all the ports in order.To return a single value the mapping function can make use of
singletonReceivePort. To return a collection, it can make use oftoReceivePort(Iterable). To emit no values, the function can returnemptyReceivePort()ornull.The returned
ReceivePortcan only be safely used by a single receiver strand.The returned
ReceivePorthas the samehashCodeaschanneland isequalto it.- Type Parameters:
S- the message type of the source (given) channel.T- the message type of the target (returned) channel.- Parameters:
channel- the channel to transformf- the mapping function- Returns:
- a
ReceivePortthat returns messages that are the result of applying the mapping function to the messages received on the given channel.
-
forEach
public static <T> void forEach(ReceivePort<T> channel, SuspendableAction1<T> action) throws SuspendExecution, java.lang.InterruptedException
Performs the given action on each message received by the given channel. This method returns only after all messages have been consumed and the channel has been closed.- Type Parameters:
T- the message type- Parameters:
channel- the channelaction- the actions- Throws:
java.lang.InterruptedExceptionSuspendExecution
-
take
public static <T> ReceivePort<T> take(ReceivePort<T> channel, long count)
- Type Parameters:
T- The message type.- Parameters:
channel- The channel.count- The maximum number of messages extracted from the underlying channel.- Returns:
- a
ReceivePortthat can provide at mostcountmessages fromchannel.
-
zip
public static <M> ReceivePort<M> zip(java.util.List<? extends ReceivePort<?>> cs, com.google.common.base.Function<java.lang.Object[],M> f)
Returns aReceivePortthat combines each vector of messages from a list of channels into a single combined message.- Type Parameters:
M- The type of the combined message- Parameters:
f- The combining functioncs- A vector of channels- Returns:
- A zipping
ReceivePort
-
zip
public static <M,S1,S2> ReceivePort<M> zip(ReceivePort<S1> c1, ReceivePort<S2> c2, co.paralleluniverse.common.util.Function2<S1,S2,M> f)
Returns aReceivePortthat combines each vector of messages from a vector of channels into a single combined message.- Type Parameters:
M- The type of the combined messageS1- The message type of the first input portS2- The message type of the second input port- Parameters:
f- The combining functionc1- The first input portc2- The second input port- Returns:
- A zipping
ReceivePort
-
zip
public static <M,S1,S2,S3> ReceivePort<M> zip(ReceivePort<S1> c1, ReceivePort<S2> c2, ReceivePort<S3> c3, co.paralleluniverse.common.util.Function3<S1,S2,S3,M> f)
Returns aReceivePortthat combines each vector of messages from a vector of channels into a single combined message.- Type Parameters:
M- The type of the combined messageS1- The message type of the first input portS2- The message type of the second input portS3- The message type of the third input port- Parameters:
f- The combining functionc1- The first input portc2- The second input portc3- The third input port- Returns:
- A zipping
ReceivePort
-
zip
public static <M,S1,S2,S3,S4> ReceivePort<M> zip(ReceivePort<S1> c1, ReceivePort<S2> c2, ReceivePort<S3> c3, ReceivePort<S4> c4, co.paralleluniverse.common.util.Function4<S1,S2,S3,S4,M> f)
Returns aReceivePortthat combines each vector of messages from a vector of channels into a single combined message.- Type Parameters:
M- The type of the combined messageS1- The message type of the first input portS2- The message type of the second input portS3- The message type of the third input portS4- The message type of the fourth input port- Parameters:
f- The combining functionc1- The first input portc2- The second input portc3- The third input portc4- The fourth input port- Returns:
- A zipping
ReceivePort
-
zip
public static <M,S1,S2,S3,S4,S5> ReceivePort<M> zip(ReceivePort<S1> c1, ReceivePort<S2> c2, ReceivePort<S3> c3, ReceivePort<S4> c4, ReceivePort<S5> c5, co.paralleluniverse.common.util.Function5<S1,S2,S3,S4,S5,M> f)
Returns aReceivePortthat combines each vector of messages from a vector of channels into a single combined message.- Type Parameters:
M- The type of the combined messageS1- The message type of the first input portS2- The message type of the second input portS3- The message type of the third input portS4- The message type of the fourth input portS5- The message type of the fifth input port- Parameters:
f- The combining functionc1- The first input portc2- The second input portc3- The third input portc4- The fourth input portc5- The fifth input port- Returns:
- A zipping
ReceivePort
-
transform
public static <M> TransformingReceivePort<M> transform(ReceivePort<M> channel)
Returns aTransformingReceivePortwrapping the given channel, which may be used for functional transformations.- Type Parameters:
M- the message type- Parameters:
channel- the channel to transform- Returns:
- the transformed
ReceivePort
-
filterSend
public static <M> SendPort<M> filterSend(SendPort<M> channel, com.google.common.base.Predicate<M> pred)
Returns aSendPortthat filters messages that satisfy a predicate before sending to a given channel. Messages that don't satisfy the predicate will be silently discarded when sent.The returned
SendPorthas the samehashCodeaschanneland isequalto it.- Type Parameters:
M- the message type.- Parameters:
channel- The channel to filterpred- the filtering predicate- Returns:
- A
SendPortthat will send only those messages which satisfy the predicate (i.e. the predicate returnstrue) to the given channel.
-
mapSend
public static <S,T> SendPort<S> mapSend(SendPort<T> channel, com.google.common.base.Function<S,T> f)
Returns aSendPortthat transforms messages by applying a given mapping function before sending them to a given channel.The returned
SendPorthas the samehashCodeaschanneland isequalto it.- Type Parameters:
S- the message type of the source (returned) channel.T- the message type of the target (given) channel.- Parameters:
channel- the channel to transformf- the mapping function- Returns:
- a
SendPortthat passes messages to the given channel after transforming them by applying the mapping function.
-
reduceSend
public static <S,T> SendPort<S> reduceSend(SendPort<T> channel, co.paralleluniverse.common.util.Function2<T,S,T> f, T init)
Returns aSendPortaccepting messages that are transformed by a reduction function.The returned
SendPorthas the samehashCodeaschanneland isequalto it.- Type Parameters:
S- The message type of the source (returned) channel.T- The message type of the target (given) channel.- Parameters:
channel- The channel to transform.f- The reduction function.init- The initial input to the reduction function.- Returns:
- a
ReceivePortthat returns messages that are the result of applying the reduction function to the messages received on the given channel.
-
flatMapSend
public static <S,T> SendPort<S> flatMapSend(FiberFactory fiberFactory, Channel<S> pipe, SendPort<T> channel, com.google.common.base.Function<S,ReceivePort<T>> f)
Returns aSendPortthat sends messages that are transformed by a given flat-mapping function into a given channel. Unlikemap, the mapping function does not returns a single output message for every input message, but a newReceivePort. All the returned ports are concatenated and sent to the channel.To return a single value the mapping function can make use of
singletonReceivePort. To return a collection, it can make use oftoReceivePort(Iterable). To emit no values, the function can returnemptyReceivePort()ornull.If multiple producers send messages into the channel, the messages from the
ReceivePorts returned by the mapping function may be interleaved with other messages.The returned
SendPorthas the samehashCodeaschanneland isequalto it.- Type Parameters:
S- the message type of the source (given) channel.T- the message type of the target (returned) channel.- Parameters:
pipe- an intermediate channel used in the flat-mapping operation. Messages are first sent to this channel before being transformed.channel- the channel to transformf- the mapping function- Returns:
- a
ReceivePortthat returns messages that are the result of applying the mapping function to the messages received on the given channel.
-
flatMapSend
public static <S,T> SendPort<S> flatMapSend(Channel<S> pipe, SendPort<T> channel, com.google.common.base.Function<S,ReceivePort<T>> f)
-
transformSend
public static <M> TransformingSendPort<M> transformSend(SendPort<M> channel)
Returns aTransformingSendPortwrapping the given channel, which may be used for functional transformations.- Type Parameters:
M- the message type- Parameters:
channel- the channel to transform- Returns:
- the transformed
SendPort
-
emptyReceivePort
public static <T> ReceivePort<T> emptyReceivePort()
Returns an emptyReceivePort. The port is closed and receives no messages;
-
singletonReceivePort
public static <T> ReceivePort<T> singletonReceivePort(T object)
Returns a newly createdReceivePortthat receives a single message: the object given to the function.- Type Parameters:
T-- Parameters:
object- the single object that will be returned by theReceivePort.
-
toReceivePort
public static <T> ReceivePort<T> toReceivePort(java.util.Iterator<T> iterator)
Returns a newly createdReceivePortthat receives all the elements iterated by the iterator.- Type Parameters:
T-- Parameters:
iterator- the iterator to transform into aReceivePort.
-
toReceivePort
public static <T> ReceivePort<T> toReceivePort(java.lang.Iterable<T> iterable)
Returns a newly createdReceivePortthat receives all the elements iterated by the iterable.- Type Parameters:
T-- Parameters:
iterable- the iterable to transform into aReceivePort.
-
-