Class TransformingReceivePort<T>
- java.lang.Object
-
- co.paralleluniverse.strands.channels.DelegatingReceivePort<T>
-
- co.paralleluniverse.strands.channels.TransformingReceivePort<T>
-
- All Implemented Interfaces:
co.paralleluniverse.common.util.DelegatingEquals
,Port<T>
,PortAutoCloseable
,ReceivePort<T>
,java.lang.AutoCloseable
public class TransformingReceivePort<T> extends DelegatingReceivePort<T>
AReceivePort
with additional functional-transform operations, usually wrapping a plainReceivePort
.
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from interface co.paralleluniverse.strands.channels.ReceivePort
ReceivePort.EOFException
-
-
Field Summary
-
Fields inherited from class co.paralleluniverse.strands.channels.DelegatingReceivePort
target
-
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description <U> TransformingReceivePort<U>
fiberTransform(FiberFactory fiberFactory, SuspendableAction2<? extends ReceivePort<? super T>,? extends SendPort<? extends U>> transformer, Channel<U> out)
Spawns a fiber that transforms values read from this channel and writes values to theout
channel.<U> TransformingReceivePort<U>
fiberTransform(SuspendableAction2<? extends ReceivePort<? super T>,? extends SendPort<? extends U>> transformer, Channel<U> out)
Spawns a fiber that transforms values read from this channel and writes values to theout
channel.TransformingReceivePort<T>
filter(com.google.common.base.Predicate<T> pred)
Returns aTransformingReceivePort
that filters messages that satisfy a predicate from this given channel.<U> TransformingReceivePort<U>
flatMap(com.google.common.base.Function<T,ReceivePort<U>> f)
Returns aTransformingReceivePort
that receives messages that are transformed by a given flat-mapping function from this channel.void
forEach(SuspendableAction1<T> action)
Performs the given action on each message received by this channel.<U> TransformingReceivePort<U>
map(com.google.common.base.Function<T,U> f)
Returns aTransformingReceivePort
that receives messages that are transformed by a given mapping function from this channel.<U> TransformingReceivePort<U>
reduce(co.paralleluniverse.common.util.Function2<U,T,U> f, U init)
Returns aTransformingReceivePort
from which receiving messages that are transformed from a given channel by a given reduction function.TransformingReceivePort<T>
take(long count)
Returns aTakeReceivePort
that can provide at mostcount
messages from the underlying channel.-
Methods inherited from class co.paralleluniverse.strands.channels.DelegatingReceivePort
close, equals, hashCode, isClosed, receive, receive, receive, toString, tryReceive
-
-
-
-
Method Detail
-
filter
public TransformingReceivePort<T> filter(com.google.common.base.Predicate<T> pred)
Returns aTransformingReceivePort
that filters messages that satisfy a predicate from this given channel. All messages (even those not satisfying the predicate) will be consumed from the original channel; those that don't satisfy the predicate will be silently discarded.The returned
TransformingReceivePort
has the samehashCode
aschannel
and isequal
to it.- Parameters:
pred
- the filtering predicate- Returns:
- A
TransformingReceivePort
that will receive all those messages from the original channel which satisfy the predicate (i.e. the predicate returnstrue
).
-
map
public <U> TransformingReceivePort<U> map(com.google.common.base.Function<T,U> f)
Returns aTransformingReceivePort
that receives messages that are transformed by a given mapping function from this channel.The returned
TransformingReceivePort
has the samehashCode
aschannel
and isequal
to it.- Parameters:
f
- the mapping function- Returns:
- a
TransformingReceivePort
that returns messages that are the result of applying the mapping function to the messages received on the given channel.
-
reduce
public <U> TransformingReceivePort<U> reduce(co.paralleluniverse.common.util.Function2<U,T,U> f, U init)
Returns aTransformingReceivePort
from which receiving messages that are transformed from a given channel by a given reduction function.The returned
TransformingReceivePort
has the samehashCode
aschannel
and isequal
to it.- Parameters:
f
- The reduction function.init
- The initial input to the reduction function.- Returns:
- a
ReceivePort
that returns messages that are the result of applying the reduction function to the messages received on the given channel.
-
flatMap
public <U> TransformingReceivePort<U> flatMap(com.google.common.base.Function<T,ReceivePort<U>> f)
Returns aTransformingReceivePort
that receives messages that are transformed by a given flat-mapping function from this channel. Unlikemap
, the mapping function does not returns a single output message for every input message, but a newReceivePort
. All the returned ports are concatenated into a singleReceivePort
that receives the messages received by all the ports in order.To return a single value the mapping function can make use of
Channels.singletonReceivePort(Object)
. To return a collection, it can make use ofChannels.toReceivePort(Iterable)
. To emit no values, the function can returnChannels.emptyReceivePort()
ornull
.The returned
TransformingReceivePort
can only be safely used by a single receiver strand.The returned
TransformingReceivePort
has the samehashCode
aschannel
and isequal
to it.- Parameters:
f
- the mapping function- Returns:
- a
TransformingReceivePort
that returns messages that are the result of applying the mapping function to the messages received on the given channel.
-
take
public TransformingReceivePort<T> take(long count)
Returns aTakeReceivePort
that can provide at mostcount
messages from the underlying channel.The returned
TransformingReceivePort
has the samehashCode
aschannel
and isequal
to it.- Parameters:
count
- The maximum number of messages extracted from the underlying channel.
-
fiberTransform
public <U> TransformingReceivePort<U> fiberTransform(SuspendableAction2<? extends ReceivePort<? super T>,? extends SendPort<? extends U>> transformer, Channel<U> out)
Spawns a fiber that transforms values read from this channel and writes values to theout
channel.When the transformation terminates. the output channel is automatically closed. If the transformation terminates abnormally (throws an exception), the output channel is
closed with that exception
.- Parameters:
out
- the output channeltransformer
- the transforming operation- Returns:
- A
TransformingReceivePort
wrapping theout
channel.
-
fiberTransform
public <U> TransformingReceivePort<U> fiberTransform(FiberFactory fiberFactory, SuspendableAction2<? extends ReceivePort<? super T>,? extends SendPort<? extends U>> transformer, Channel<U> out)
Spawns a fiber that transforms values read from this channel and writes values to theout
channel.When the transformation terminates. the output channel is automatically closed. If the transformation terminates abnormally (throws an exception), the output channel is
closed with that exception
.- Parameters:
fiberFactory
- will be used to create the fiberout
- the output channeltransformer
- the transforming operation- Returns:
- A
TransformingReceivePort
wrapping theout
channel.
-
forEach
public void forEach(SuspendableAction1<T> action) throws SuspendExecution, java.lang.InterruptedException
Performs the given action on each message received by this channel. This method returns only after all messages have been consumed and the channel has been closed.- Parameters:
action
-- Throws:
SuspendExecution
java.lang.InterruptedException
-
-