Class Channels


  • public final class Channels
    extends java.lang.Object
    A utility class for creating and manipulating channels.
    • Method Detail

      • newChannel

        public static <Message> Channel<Message> newChannel​(int bufferSize,
                                                            Channels.OverflowPolicy policy,
                                                            boolean singleProducer,
                                                            boolean singleConsumer)
        Creates a new channel with the given properties.

        Some combinations of properties are unsupported, and will throw an IllegalArgumentException if requested:

        • unbounded channel with multiple consumers
        • a transfer channel with any overflow policy other than BLOCK
        • An overflow policy of DISPLACE with multiple consumers.
        An unbounded channel ignores its overflow policy as it never overflows.
        Type Parameters:
        Message - the type of messages that can be sent to this channel.
        Parameters:
        bufferSize - if positive, the number of messages that the channel can hold in an internal buffer; 0 for a transfer channel, i.e. a channel with no internal buffer. -1 for a channel with an unbounded (infinite) buffer.
        policy - the Channels.OverflowPolicy specifying how the channel (if bounded) will behave if its internal buffer overflows.
        singleProducer - whether the channel will be used by a single producer strand.
        singleConsumer - whether the channel will be used by a single consumer strand.
        Returns:
        The newly created channel
      • newChannel

        public static <Message> Channel<Message> newChannel​(int bufferSize,
                                                            Channels.OverflowPolicy policy)
        Creates a new channel with the given mailbox size and Channels.OverflowPolicy, with other properties set to their default values. Specifically, singleProducer will be set to false, while singleConsumer will be set to true.
        Type Parameters:
        Message - the type of messages that can be sent to this channel.
        Parameters:
        bufferSize - if positive, the number of messages that the channel can hold in an internal buffer; 0 for a transfer channel, i.e. a channel with no internal buffer. -1 for a channel with an unbounded (infinite) buffer.
        policy - the Channels.OverflowPolicy specifying how the channel (if bounded) will behave if its internal buffer overflows.
        Returns:
        The newly created channel
        See Also:
        newChannel(int, co.paralleluniverse.strands.channels.Channels.OverflowPolicy, boolean, boolean)
      • newChannel

        public static <Message> Channel<Message> newChannel​(int bufferSize)
        Creates a new channel with the given mailbox size with other properties set to their default values. Specifically, the Channels.OverflowPolicy will be set to BLOCK, singleProducer will be set to false, and singleConsumer will be set to true.
        Type Parameters:
        Message - the type of messages that can be sent to this channel.
        Parameters:
        bufferSize - if positive, the number of messages that the channel can hold in an internal buffer; 0 for a transfer channel, i.e. a channel with no internal buffer. -1 for a channel with an unbounded (infinite) buffer.
        Returns:
        The newly created channel
        See Also:
        newChannel(int, co.paralleluniverse.strands.channels.Channels.OverflowPolicy, boolean, boolean)
      • newIntChannel

        public static IntChannel newIntChannel​(int bufferSize,
                                               Channels.OverflowPolicy policy,
                                               boolean singleProducer,
                                               boolean singleConsumer)
        Creates a new primitive int channel with the given properties.

        Some combinations of properties are unsupported, and will throw an IllegalArgumentException if requested:

        • multiple consumers
        • a transfer channel with any overflow policy other than BLOCK
        • An overflow policy of DISPLACE with multiple consumers.
        An unbounded channel ignores its overflow policy as it never overflows.
        Parameters:
        bufferSize - if positive, the number of messages that the channel can hold in an internal buffer; 0 for a transfer channel, i.e. a channel with no internal buffer. -1 for a channel with an unbounded (infinite) buffer.
        policy - the Channels.OverflowPolicy specifying how the channel (if bounded) will behave if its internal buffer overflows.
        singleProducer - whether the channel will be used by a single producer strand.
        singleConsumer - whether the channel will be used by a single consumer strand. Currently primitive channels only support a single consumer, so this argument must be set to false.
        Returns:
        The newly created channel
      • newIntChannel

        public static IntChannel newIntChannel​(int bufferSize)
        Creates a new primitive int channel with the given mailbox size with other properties set to their default values. Specifically, the Channels.OverflowPolicy will be set to BLOCK, singleProducer will be set to false, and singleConsumer will be set to true.
        Parameters:
        bufferSize - if positive, the number of messages that the channel can hold in an internal buffer; 0 for a transfer channel, i.e. a channel with no internal buffer. -1 for a channel with an unbounded (infinite) buffer.
        Returns:
        The newly created channel
        See Also:
        newIntChannel(int, co.paralleluniverse.strands.channels.Channels.OverflowPolicy, boolean, boolean)
      • newLongChannel

        public static LongChannel newLongChannel​(int bufferSize,
                                                 Channels.OverflowPolicy policy,
                                                 boolean singleProducer,
                                                 boolean singleConsumer)
        Creates a new primitive long channel with the given properties.

        Some combinations of properties are unsupported, and will throw an IllegalArgumentException if requested:

        • multiple consumers
        • a transfer channel with any overflow policy other than BLOCK
        • An overflow policy of DISPLACE with multiple consumers.
        An unbounded channel ignores its overflow policy as it never overflows.
        Parameters:
        bufferSize - if positive, the number of messages that the channel can hold in an internal buffer; 0 for a transfer channel, i.e. a channel with no internal buffer. -1 for a channel with an unbounded (infinite) buffer.
        policy - the Channels.OverflowPolicy specifying how the channel (if bounded) will behave if its internal buffer overflows.
        singleProducer - whether the channel will be used by a single producer strand.
        singleConsumer - whether the channel will be used by a single consumer strand. Currently primitive channels only support a single consumer, so this argument must be set to false.
        Returns:
        The newly created channel
      • newLongChannel

        public static LongChannel newLongChannel​(int bufferSize)
        Creates a new primitive long channel with the given mailbox size with other properties set to their default values. Specifically, the Channels.OverflowPolicy will be set to BLOCK, singleProducer will be set to false, and singleConsumer will be set to true.
        Parameters:
        bufferSize - if positive, the number of messages that the channel can hold in an internal buffer; 0 for a transfer channel, i.e. a channel with no internal buffer. -1 for a channel with an unbounded (infinite) buffer.
        Returns:
        The newly created channel
        See Also:
        newLongChannel(int, co.paralleluniverse.strands.channels.Channels.OverflowPolicy, boolean, boolean)
      • newFloatChannel

        public static FloatChannel newFloatChannel​(int bufferSize,
                                                   Channels.OverflowPolicy policy,
                                                   boolean singleProducer,
                                                   boolean singleConsumer)
        Creates a new primitive float channel with the given properties.

        Some combinations of properties are unsupported, and will throw an IllegalArgumentException if requested:

        • multiple consumers
        • a transfer channel with any overflow policy other than BLOCK
        • An overflow policy of DISPLACE with multiple consumers.
        An unbounded channel ignores its overflow policy as it never overflows.
        Parameters:
        bufferSize - if positive, the number of messages that the channel can hold in an internal buffer; 0 for a transfer channel, i.e. a channel with no internal buffer. -1 for a channel with an unbounded (infinite) buffer.
        policy - the Channels.OverflowPolicy specifying how the channel (if bounded) will behave if its internal buffer overflows.
        singleProducer - whether the channel will be used by a single producer strand.
        singleConsumer - whether the channel will be used by a single consumer strand. Currently primitive channels only support a single consumer, so this argument must be set to false.
        Returns:
        The newly created channel
      • newFloatChannel

        public static FloatChannel newFloatChannel​(int bufferSize)
        Creates a new primitive float channel with the given mailbox size with other properties set to their default values. Specifically, the Channels.OverflowPolicy will be set to BLOCK, singleProducer will be set to false, and singleConsumer will be set to true.
        Parameters:
        bufferSize - if positive, the number of messages that the channel can hold in an internal buffer; 0 for a transfer channel, i.e. a channel with no internal buffer. -1 for a channel with an unbounded (infinite) buffer.
        Returns:
        The newly created channel
        See Also:
        newFloatChannel(int, co.paralleluniverse.strands.channels.Channels.OverflowPolicy, boolean, boolean)
      • newDoubleChannel

        public static DoubleChannel newDoubleChannel​(int bufferSize,
                                                     Channels.OverflowPolicy policy,
                                                     boolean singleProducer,
                                                     boolean singleConsumer)
        Creates a new primitive double channel with the given properties.

        Some combinations of properties are unsupported, and will throw an IllegalArgumentException if requested:

        • multiple consumers
        • a transfer channel with any overflow policy other than BLOCK
        • An overflow policy of DISPLACE with multiple consumers.
        An unbounded channel ignores its overflow policy as it never overflows.
        Parameters:
        bufferSize - if positive, the number of messages that the channel can hold in an internal buffer; 0 for a transfer channel, i.e. a channel with no internal buffer. -1 for a channel with an unbounded (infinite) buffer.
        policy - the Channels.OverflowPolicy specifying how the channel (if bounded) will behave if its internal buffer overflows.
        singleProducer - whether the channel will be used by a single producer strand.
        singleConsumer - whether the channel will be used by a single consumer strand. Currently primitive channels only support a single consumer, so this argument must be set to false.
        Returns:
        The newly created channel
      • newDoubleChannel

        public static DoubleChannel newDoubleChannel​(int bufferSize)
        Creates a new primitive double channel with the given mailbox size with other properties set to their default values. Specifically, the Channels.OverflowPolicy will be set to BLOCK, singleProducer will be set to false, and singleConsumer will be set to true.
        Parameters:
        bufferSize - if positive, the number of messages that the channel can hold in an internal buffer; 0 for a transfer channel, i.e. a channel with no internal buffer. -1 for a channel with an unbounded (infinite) buffer.
        Returns:
        The newly created channel
        See Also:
        newDoubleChannel(int, co.paralleluniverse.strands.channels.Channels.OverflowPolicy, boolean, boolean)
      • isTickerChannel

        public static boolean isTickerChannel​(ReceivePort<?> channel)
        Tests whether a given channel is a ticker channel, namely a channel with a bounded buffer and an overflow policy of DISPLACE. A ticker channel can be passed to one of the newTickerConsumerFor methods.
        Parameters:
        channel - the channel
        Returns:
        true if the channel is a ticker, false otherwise
      • newTickerConsumerFor

        public static <Message> ReceivePort<Message> newTickerConsumerFor​(Channel<Message> channel)
        Creates a ReceivePort that can be used to receive messages from a a ticker channel: a channel of bounded capacity and the DISPLACE overflow policy. Each ticker consumer will yield monotonic messages, namely no message will be received more than once, and the messages will be received in the order they're sent, but if the consumer is too slow, messages could be lost.
        Type Parameters:
        Message - the message type
        Parameters:
        channel - a channel of bounded capacity and the DISPLACE overflow policy.
        Returns:
        a new ReceivePort which provides a view to the supplied ticker channel.
      • newTickerConsumerFor

        public static IntReceivePort newTickerConsumerFor​(IntChannel channel)
        Creates an IntReceivePort that can be used to receive messages from a a ticker channel: a channel of bounded capacity and the DISPLACE overflow policy. Each ticker consumer will yield monotonic messages, namely no message will be received more than once, and the messages will be received in the order they're sent, but if the consumer is too slow, messages could be lost.
        Parameters:
        channel - an int channel of bounded capacity and the DISPLACE overflow policy.
        Returns:
        a new IntReceivePort which provides a view to the supplied ticker channel.
      • newTickerConsumerFor

        public static LongReceivePort newTickerConsumerFor​(LongChannel channel)
        Creates a LongReceivePort that can be used to receive messages from a a ticker channel: a channel of bounded capacity and the DISPLACE overflow policy. Each ticker consumer will yield monotonic messages, namely no message will be received more than once, and the messages will be received in the order they're sent, but if the consumer is too slow, messages could be lost.
        Parameters:
        channel - a long channel of bounded capacity and the DISPLACE overflow policy.
        Returns:
        a new LongReceivePort which provides a view to the supplied ticker channel.
      • newTickerConsumerFor

        public static FloatReceivePort newTickerConsumerFor​(FloatChannel channel)
        Creates a FloatReceivePort that can be used to receive messages from a a ticker channel: a channel of bounded capacity and the DISPLACE overflow policy. Each ticker consumer will yield monotonic messages, namely no message will be received more than once, and the messages will be received in the order they're sent, but if the consumer is too slow, messages could be lost.
        Parameters:
        channel - a float channel of bounded capacity and the DISPLACE overflow policy.
        Returns:
        a new FloatReceivePort which provides a view to the supplied ticker channel.
      • newTickerConsumerFor

        public static DoubleReceivePort newTickerConsumerFor​(DoubleChannel channel)
        Creates a DoubleReceivePort that can be used to receive messages from a a ticker channel: a channel of bounded capacity and the DISPLACE overflow policy. Each ticker consumer will yield monotonic messages, namely no message will be received more than once, and the messages will be received in the order they're sent, but if the consumer is too slow, messages could be lost.
        Parameters:
        channel - a double channel of bounded capacity and the DISPLACE overflow policy.
        Returns:
        a new DoubleReceivePort which provides a view to the supplied ticker channel.
      • fiberTransform

        public static <S,​T> void fiberTransform​(FiberFactory fiberFactory,
                                                      ReceivePort<S> in,
                                                      SendPort<T> out,
                                                      SuspendableAction2<? extends ReceivePort<? super S>,​? extends SendPort<? extends T>> transformer)
        Spawns a fiber that transforms values read from the in channel and writes values to the out channel.

        Type Parameters:
        S - the message type of the input (source) channel.
        T - the message type of the output (target) channel.
        Parameters:
        fiberFactory - will be used to create the fiber
        in - the input channel
        out - the output channel
        transformer - the transforming operation
      • fiberTransform

        public static <S,​T> void fiberTransform​(ReceivePort<S> in,
                                                      SendPort<T> out,
                                                      SuspendableAction2<? extends ReceivePort<? super S>,​? extends SendPort<? extends T>> transformer)
        Spawns a fiber that transforms values read from the in channel and writes values to the out channel.

        When the transformation terminates. the output channel is automatically closed. If the transformation terminates abnormally (throws an exception), the output channel is closed with that exception.

        Type Parameters:
        S - the message type of the input (source) channel.
        T - the message type of the output (target) channel.
        Parameters:
        in - the input channel
        out - the output channel
        transformer - the transforming operation
      • group

        public static <M> ReceivePort<M> group​(ReceivePort<? extends M>... channels)
        Returns a ReceivePort that receives messages from a set of channels. Messages from all given channels are funneled into the returned channel.
        Type Parameters:
        M - the message type of the receive ports
        Parameters:
        channels - the receive ports
        Returns:
        a ReceivePort that receives messages from channels.
      • group

        public static <M> ReceivePort<M> group​(java.util.Collection<? extends ReceivePort<? extends M>> channels)
        Returns a ReceivePort that receives messages from a set of channels. Messages from all given channels are funneled into the returned channel.
        Type Parameters:
        M - the message type of the receive ports
        Parameters:
        channels - the receive ports
        Returns:
        a ReceivePort that receives messages from channels.
      • mix

        public static <M> Mix<? extends M> mix​(ReceivePort<? extends M>... channels)
        Returns a Mix that receives messages from a set of channels. Messages from all given channels are funneled into the returned channel.
        Type Parameters:
        M - the message type of the receive ports
        Parameters:
        channels - the receive ports
        Returns:
        a ReceivePort that receives messages from channels.
      • mix

        public static <M> Mix<? extends M> mix​(java.util.Collection<? extends ReceivePort<? extends M>> channels)
        Returns a Mix that receives messages from a set of channels. Messages from all given channels are funneled into the returned channel.
        Type Parameters:
        M - the message type of the receive ports
        Parameters:
        channels - the receive ports
        Returns:
        a ReceivePort that receives messages from channels.
      • filter

        public static <M> ReceivePort<M> filter​(ReceivePort<M> channel,
                                                com.google.common.base.Predicate<M> pred)
        Returns a ReceivePort that filters messages that satisfy a predicate from a given channel. All messages (even those not satisfying the predicate) will be consumed from the original channel; those that don't satisfy the predicate will be silently discarded.

        The returned ReceivePort has the same hashCode as channel and is equal to it.

        Type Parameters:
        M - the message type.
        Parameters:
        channel - The channel to filter
        pred - the filtering predicate
        Returns:
        A ReceivePort that will receive all those messages from the original channel which satisfy the predicate (i.e. the predicate returns true).
      • map

        public static <S,​T> ReceivePort<T> map​(ReceivePort<S> channel,
                                                     com.google.common.base.Function<S,​T> f)
        Returns a ReceivePort that receives messages that are transformed by a given mapping function from a given channel.

        The returned ReceivePort has the same hashCode as channel and is equal to it.

        Type Parameters:
        S - the message type of the source (given) channel.
        T - the message type of the target (returned) channel.
        Parameters:
        channel - the channel to transform
        f - the mapping function
        Returns:
        a ReceivePort that returns messages that are the result of applying the mapping function to the messages received on the given channel.
      • reduce

        public static <S,​T> ReceivePort<T> reduce​(ReceivePort<S> channel,
                                                        co.paralleluniverse.common.util.Function2<T,​S,​T> f,
                                                        T init)
        Returns a ReceivePort providing messages that are transformed from a given channel by a given reduction function.

        The returned ReceivePort has the same hashCode as channel and is equal to it.

        Type Parameters:
        S - The message type of the source (given) channel.
        T - The message type of the target (returned) channel.
        Parameters:
        channel - The channel to transform.
        f - The reduction function.
        init - The initial input to the reduction function.
        Returns:
        a ReceivePort that returns messages that are the result of applying the reduction function to the messages received on the given channel.
      • mapErrors

        public static <T> ReceivePort<T> mapErrors​(ReceivePort<T> channel,
                                                   com.google.common.base.Function<java.lang.Exception,​T> f)
        Returns a ReceivePort that maps exceptions thrown by the underlying channel (by channel transformations, or as a result of SendPort.close(Throwable) ) into messages.

        The returned ReceivePort has the same hashCode as channel and is equal to it.

        Type Parameters:
        T - the message type of the target (returned) channel.
        Parameters:
        channel - the channel to transform
        f - the exception mapping function
        Returns:
        a ReceivePort that maps exceptions thrown by the given channel
      • flatMap

        public static <S,​T> ReceivePort<T> flatMap​(ReceivePort<S> channel,
                                                         com.google.common.base.Function<S,​ReceivePort<T>> f)
        Returns a ReceivePort that receives messages that are transformed by a given flat-mapping function from a given channel. Unlike map, the mapping function does not returns a single output message for every input message, but a new ReceivePort. All the returned ports are concatenated into a single ReceivePort that receives the messages received by all the ports in order.

        To return a single value the mapping function can make use of singletonReceivePort. To return a collection, it can make use of toReceivePort(Iterable). To emit no values, the function can return emptyReceivePort() or null.

        The returned ReceivePort can only be safely used by a single receiver strand.

        The returned ReceivePort has the same hashCode as channel and is equal to it.

        Type Parameters:
        S - the message type of the source (given) channel.
        T - the message type of the target (returned) channel.
        Parameters:
        channel - the channel to transform
        f - the mapping function
        Returns:
        a ReceivePort that returns messages that are the result of applying the mapping function to the messages received on the given channel.
      • forEach

        public static <T> void forEach​(ReceivePort<T> channel,
                                       SuspendableAction1<T> action)
                                throws SuspendExecution,
                                       java.lang.InterruptedException
        Performs the given action on each message received by the given channel. This method returns only after all messages have been consumed and the channel has been closed.
        Type Parameters:
        T - the message type
        Parameters:
        channel - the channel
        action - the actions
        Throws:
        java.lang.InterruptedException
        SuspendExecution
      • take

        public static <T> ReceivePort<T> take​(ReceivePort<T> channel,
                                              long count)
        Returns a ReceivePort that can provide at most count messages from channel.
        Type Parameters:
        T - The message type.
        Parameters:
        channel - The channel.
        count - The maximum number of messages extracted from the underlying channel.
        Returns:
        a ReceivePort that can provide at most count messages from channel.
      • zip

        public static <M> ReceivePort<M> zip​(java.util.List<? extends ReceivePort<?>> cs,
                                             com.google.common.base.Function<java.lang.Object[],​M> f)
        Returns a ReceivePort that combines each vector of messages from a list of channels into a single combined message.
        Type Parameters:
        M - The type of the combined message
        Parameters:
        f - The combining function
        cs - A vector of channels
        Returns:
        A zipping ReceivePort
      • zip

        public static <M,​S1,​S2> ReceivePort<M> zip​(ReceivePort<S1> c1,
                                                               ReceivePort<S2> c2,
                                                               co.paralleluniverse.common.util.Function2<S1,​S2,​M> f)
        Returns a ReceivePort that combines each vector of messages from a vector of channels into a single combined message.
        Type Parameters:
        M - The type of the combined message
        S1 - The message type of the first input port
        S2 - The message type of the second input port
        Parameters:
        f - The combining function
        c1 - The first input port
        c2 - The second input port
        Returns:
        A zipping ReceivePort
      • zip

        public static <M,​S1,​S2,​S3> ReceivePort<M> zip​(ReceivePort<S1> c1,
                                                                        ReceivePort<S2> c2,
                                                                        ReceivePort<S3> c3,
                                                                        co.paralleluniverse.common.util.Function3<S1,​S2,​S3,​M> f)
        Returns a ReceivePort that combines each vector of messages from a vector of channels into a single combined message.
        Type Parameters:
        M - The type of the combined message
        S1 - The message type of the first input port
        S2 - The message type of the second input port
        S3 - The message type of the third input port
        Parameters:
        f - The combining function
        c1 - The first input port
        c2 - The second input port
        c3 - The third input port
        Returns:
        A zipping ReceivePort
      • zip

        public static <M,​S1,​S2,​S3,​S4> ReceivePort<M> zip​(ReceivePort<S1> c1,
                                                                                 ReceivePort<S2> c2,
                                                                                 ReceivePort<S3> c3,
                                                                                 ReceivePort<S4> c4,
                                                                                 co.paralleluniverse.common.util.Function4<S1,​S2,​S3,​S4,​M> f)
        Returns a ReceivePort that combines each vector of messages from a vector of channels into a single combined message.
        Type Parameters:
        M - The type of the combined message
        S1 - The message type of the first input port
        S2 - The message type of the second input port
        S3 - The message type of the third input port
        S4 - The message type of the fourth input port
        Parameters:
        f - The combining function
        c1 - The first input port
        c2 - The second input port
        c3 - The third input port
        c4 - The fourth input port
        Returns:
        A zipping ReceivePort
      • zip

        public static <M,​S1,​S2,​S3,​S4,​S5> ReceivePort<M> zip​(ReceivePort<S1> c1,
                                                                                          ReceivePort<S2> c2,
                                                                                          ReceivePort<S3> c3,
                                                                                          ReceivePort<S4> c4,
                                                                                          ReceivePort<S5> c5,
                                                                                          co.paralleluniverse.common.util.Function5<S1,​S2,​S3,​S4,​S5,​M> f)
        Returns a ReceivePort that combines each vector of messages from a vector of channels into a single combined message.
        Type Parameters:
        M - The type of the combined message
        S1 - The message type of the first input port
        S2 - The message type of the second input port
        S3 - The message type of the third input port
        S4 - The message type of the fourth input port
        S5 - The message type of the fifth input port
        Parameters:
        f - The combining function
        c1 - The first input port
        c2 - The second input port
        c3 - The third input port
        c4 - The fourth input port
        c5 - The fifth input port
        Returns:
        A zipping ReceivePort
      • filterSend

        public static <M> SendPort<M> filterSend​(SendPort<M> channel,
                                                 com.google.common.base.Predicate<M> pred)
        Returns a SendPort that filters messages that satisfy a predicate before sending to a given channel. Messages that don't satisfy the predicate will be silently discarded when sent.

        The returned SendPort has the same hashCode as channel and is equal to it.

        Type Parameters:
        M - the message type.
        Parameters:
        channel - The channel to filter
        pred - the filtering predicate
        Returns:
        A SendPort that will send only those messages which satisfy the predicate (i.e. the predicate returns true) to the given channel.
      • mapSend

        public static <S,​T> SendPort<S> mapSend​(SendPort<T> channel,
                                                      com.google.common.base.Function<S,​T> f)
        Returns a SendPort that transforms messages by applying a given mapping function before sending them to a given channel.

        The returned SendPort has the same hashCode as channel and is equal to it.

        Type Parameters:
        S - the message type of the source (returned) channel.
        T - the message type of the target (given) channel.
        Parameters:
        channel - the channel to transform
        f - the mapping function
        Returns:
        a SendPort that passes messages to the given channel after transforming them by applying the mapping function.
      • reduceSend

        public static <S,​T> SendPort<S> reduceSend​(SendPort<T> channel,
                                                         co.paralleluniverse.common.util.Function2<T,​S,​T> f,
                                                         T init)
        Returns a SendPort accepting messages that are transformed by a reduction function.

        The returned SendPort has the same hashCode as channel and is equal to it.

        Type Parameters:
        S - The message type of the source (returned) channel.
        T - The message type of the target (given) channel.
        Parameters:
        channel - The channel to transform.
        f - The reduction function.
        init - The initial input to the reduction function.
        Returns:
        a ReceivePort that returns messages that are the result of applying the reduction function to the messages received on the given channel.
      • flatMapSend

        public static <S,​T> SendPort<S> flatMapSend​(FiberFactory fiberFactory,
                                                          Channel<S> pipe,
                                                          SendPort<T> channel,
                                                          com.google.common.base.Function<S,​ReceivePort<T>> f)
        Returns a SendPort that sends messages that are transformed by a given flat-mapping function into a given channel. Unlike map, the mapping function does not returns a single output message for every input message, but a new ReceivePort. All the returned ports are concatenated and sent to the channel.

        To return a single value the mapping function can make use of singletonReceivePort. To return a collection, it can make use of toReceivePort(Iterable). To emit no values, the function can return emptyReceivePort() or null.

        If multiple producers send messages into the channel, the messages from the ReceivePorts returned by the mapping function may be interleaved with other messages.

        The returned SendPort has the same hashCode as channel and is equal to it.

        Type Parameters:
        S - the message type of the source (given) channel.
        T - the message type of the target (returned) channel.
        Parameters:
        pipe - an intermediate channel used in the flat-mapping operation. Messages are first sent to this channel before being transformed.
        channel - the channel to transform
        f - the mapping function
        Returns:
        a ReceivePort that returns messages that are the result of applying the mapping function to the messages received on the given channel.
      • transformSend

        public static <M> TransformingSendPort<M> transformSend​(SendPort<M> channel)
        Returns a TransformingSendPort wrapping the given channel, which may be used for functional transformations.
        Type Parameters:
        M - the message type
        Parameters:
        channel - the channel to transform
        Returns:
        the transformed SendPort
      • emptyReceivePort

        public static <T> ReceivePort<T> emptyReceivePort()
        Returns an empty ReceivePort. The port is closed and receives no messages;
      • singletonReceivePort

        public static <T> ReceivePort<T> singletonReceivePort​(T object)
        Returns a newly created ReceivePort that receives a single message: the object given to the function.
        Type Parameters:
        T -
        Parameters:
        object - the single object that will be returned by the ReceivePort.
      • toReceivePort

        public static <T> ReceivePort<T> toReceivePort​(java.util.Iterator<T> iterator)
        Returns a newly created ReceivePort that receives all the elements iterated by the iterator.
        Type Parameters:
        T -
        Parameters:
        iterator - the iterator to transform into a ReceivePort.
      • toReceivePort

        public static <T> ReceivePort<T> toReceivePort​(java.lang.Iterable<T> iterable)
        Returns a newly created ReceivePort that receives all the elements iterated by the iterable.
        Type Parameters:
        T -
        Parameters:
        iterable - the iterable to transform into a ReceivePort.