Class ReactiveStreams
- java.lang.Object
-
- co.paralleluniverse.strands.channels.reactivestreams.ReactiveStreams
-
public class ReactiveStreams extends java.lang.Object
Converts between Quasar channels and reactive streams
-
-
Constructor Summary
Constructors Constructor Description ReactiveStreams()
-
Method Summary
All Methods Static Methods Concrete Methods Modifier and Type Method Description static <T> ReceivePort<T>
subscribe(int bufferSize, Channels.OverflowPolicy policy, org.reactivestreams.Publisher<T> publisher)
Subscribes to a givenPublisher
and return aReceivePort
to the subscription.static <T,R>
org.reactivestreams.Processor<T,R>toProcessor(int bufferSize, Channels.OverflowPolicy policy, SuspendableAction2<? extends ReceivePort<? super T>,? extends SendPort<? extends R>> transformer)
Turns atransformer
into aPublisher
.static <T,R>
org.reactivestreams.Processor<T,R>toProcessor(FiberFactory ff, int bufferSize, Channels.OverflowPolicy policy, SuspendableAction2<? extends ReceivePort<? super T>,? extends SendPort<? extends R>> transformer)
Turns atransformer
into aPublisher
.static <T> org.reactivestreams.Publisher<T>
toPublisher(ReceivePort<T> channel)
Turns achannel
to aPublisher
.static <T> org.reactivestreams.Publisher<T>
toPublisher(ReceivePort<T> channel, FiberFactory ff)
Turns achannel
to aPublisher
.static <T> org.reactivestreams.Publisher<T>
toPublisher(Topic<T> topic)
Turns atopic
to aPublisher
.static <T> org.reactivestreams.Publisher<T>
toPublisher(Topic<T> topic, FiberFactory ff)
Turns atopic
to aPublisher
.
-
-
-
Method Detail
-
subscribe
public static <T> ReceivePort<T> subscribe(int bufferSize, Channels.OverflowPolicy policy, org.reactivestreams.Publisher<T> publisher)
Subscribes to a givenPublisher
and return aReceivePort
to the subscription. This creates an internal single consumer channel that will receive the published elements.- Parameters:
bufferSize
- the size of the buffer of the internal channel; may be-1
for unbounded, but may not be0
)policy
- theChannels.OverflowPolicy
of the internal channel.publisher
- the subscriber- Returns:
- A
ReceivePort
which emits the elements published by the subscriber
-
toPublisher
public static <T> org.reactivestreams.Publisher<T> toPublisher(ReceivePort<T> channel, FiberFactory ff)
Turns achannel
to aPublisher
. All items sent to the channel will be published by the publisher.The publisher will allow a single subscription, unless the channel is a
ticker channel
in which case, multiple subscribers will be allowed, and a newticker consumer
will be created for each.Every subscription to the returned publisher creates an internal fiber, that will receive items from the channel and publish them.
- Parameters:
channel
- the channelff
- theFiberFactory
to create the internal fiber(s); ifnull
then a default factory is used.- Returns:
- a new publisher for the channel's items
-
toPublisher
public static <T> org.reactivestreams.Publisher<T> toPublisher(ReceivePort<T> channel)
Turns achannel
to aPublisher
. All items sent to the channel will be published by the publisher.The publisher will allow a single subscription, unless the channel is a
ticker channel
in which case, multiple subscribers will be allowed, and a newticker consumer
will be created for each.Every subscription to the returned publisher creates an internal fiber, that will receive items from the channel and publish them.
Calling this method is the same as calling
toPublisher(channel, null)
- Parameters:
channel
- the channel- Returns:
- a new publisher for the channel's items
-
toPublisher
public static <T> org.reactivestreams.Publisher<T> toPublisher(Topic<T> topic, FiberFactory ff)
Turns atopic
to aPublisher
. All items sent to the topic will be published by the publisher.A new transfer channel (i.e. a blocking channel with a buffer of size 0) subscribed to the topic will be created for every subscriber.
Every subscription to the returned publisher creates an internal fiber, that will receive items from the subscription's channel and publish them.
- Parameters:
topic
- the topicff
- theFiberFactory
to create the internal fiber(s); ifnull
then a default factory is used.- Returns:
- a new publisher for the topic's items
-
toPublisher
public static <T> org.reactivestreams.Publisher<T> toPublisher(Topic<T> topic)
Turns atopic
to aPublisher
. All items sent to the topic will be published by the publisher.A new transfer channel (i.e. a blocking channel with a buffer of size 0) subscribed to the topic will be created for every subscriber.
Every subscription to the returned publisher creates an internal fiber, that will receive items from the subscription's channel and publish them.
Calling this method is the same as calling
toPublisher(channel, null)
- Parameters:
topic
- the topic- Returns:
- a new publisher for the topic's items
-
toProcessor
public static <T,R> org.reactivestreams.Processor<T,R> toProcessor(FiberFactory ff, int bufferSize, Channels.OverflowPolicy policy, SuspendableAction2<? extends ReceivePort<? super T>,? extends SendPort<? extends R>> transformer)
Turns atransformer
into aPublisher
. The transformer will run in its own fiber.- Type Parameters:
T
- the type of elements flowing into the transformerR
- the type of elements flowing out of the transformer- Parameters:
ff
- theFiberFactory
to create the internal fiber(s); ifnull
then a default factory is used.bufferSize
- the size of the buffer of the internal channel; may be-1
for unbounded, but may not be0
)policy
- theChannels.OverflowPolicy
of the internal channel.transformer
- a function that reads from it's input channel and writes to its output channel- Returns:
- a
Processor
running the given transformer.
-
toProcessor
public static <T,R> org.reactivestreams.Processor<T,R> toProcessor(int bufferSize, Channels.OverflowPolicy policy, SuspendableAction2<? extends ReceivePort<? super T>,? extends SendPort<? extends R>> transformer)
Turns atransformer
into aPublisher
. The transformer will run in its own fiber.Same as calling
toProcessor(null, bufferSize, policy, transformer)
- Type Parameters:
T
- the type of elements flowing into the transformerR
- the type of elements flowing out of the transformer- Parameters:
bufferSize
- the size of the buffer of the internal channel; may be-1
for unbounded, but may not be0
)policy
- theChannels.OverflowPolicy
of the internal channel.transformer
- a function that reads from it's input channel and writes to its output channel- Returns:
- a
Processor
running the given transformer.
-
-