Class ReactiveStreams


  • public class ReactiveStreams
    extends java.lang.Object
    Converts between Quasar channels and reactive streams
    • Constructor Detail

      • ReactiveStreams

        public ReactiveStreams()
    • Method Detail

      • subscribe

        public static <T> ReceivePort<T> subscribe​(int bufferSize,
                                                   Channels.OverflowPolicy policy,
                                                   org.reactivestreams.Publisher<T> publisher)
        Subscribes to a given Publisher and return a ReceivePort to the subscription. This creates an internal single consumer channel that will receive the published elements.
        Parameters:
        bufferSize - the size of the buffer of the internal channel; may be -1 for unbounded, but may not be 0)
        policy - the Channels.OverflowPolicy of the internal channel.
        publisher - the subscriber
        Returns:
        A ReceivePort which emits the elements published by the subscriber
      • toPublisher

        public static <T> org.reactivestreams.Publisher<T> toPublisher​(ReceivePort<T> channel,
                                                                       FiberFactory ff)
        Turns a channel to a Publisher. All items sent to the channel will be published by the publisher.

        The publisher will allow a single subscription, unless the channel is a ticker channel in which case, multiple subscribers will be allowed, and a new ticker consumer will be created for each.

        Every subscription to the returned publisher creates an internal fiber, that will receive items from the channel and publish them.

        Parameters:
        channel - the channel
        ff - the FiberFactory to create the internal fiber(s); if null then a default factory is used.
        Returns:
        a new publisher for the channel's items
      • toPublisher

        public static <T> org.reactivestreams.Publisher<T> toPublisher​(ReceivePort<T> channel)
        Turns a channel to a Publisher. All items sent to the channel will be published by the publisher.

        The publisher will allow a single subscription, unless the channel is a ticker channel in which case, multiple subscribers will be allowed, and a new ticker consumer will be created for each.

        Every subscription to the returned publisher creates an internal fiber, that will receive items from the channel and publish them.

        Calling this method is the same as calling toPublisher(channel, null)

        Parameters:
        channel - the channel
        Returns:
        a new publisher for the channel's items
      • toPublisher

        public static <T> org.reactivestreams.Publisher<T> toPublisher​(Topic<T> topic,
                                                                       FiberFactory ff)
        Turns a topic to a Publisher. All items sent to the topic will be published by the publisher.

        A new transfer channel (i.e. a blocking channel with a buffer of size 0) subscribed to the topic will be created for every subscriber.

        Every subscription to the returned publisher creates an internal fiber, that will receive items from the subscription's channel and publish them.

        Parameters:
        topic - the topic
        ff - the FiberFactory to create the internal fiber(s); if null then a default factory is used.
        Returns:
        a new publisher for the topic's items
      • toPublisher

        public static <T> org.reactivestreams.Publisher<T> toPublisher​(Topic<T> topic)
        Turns a topic to a Publisher. All items sent to the topic will be published by the publisher.

        A new transfer channel (i.e. a blocking channel with a buffer of size 0) subscribed to the topic will be created for every subscriber.

        Every subscription to the returned publisher creates an internal fiber, that will receive items from the subscription's channel and publish them.

        Calling this method is the same as calling toPublisher(channel, null)

        Parameters:
        topic - the topic
        Returns:
        a new publisher for the topic's items
      • toProcessor

        public static <T,​R> org.reactivestreams.Processor<T,​R> toProcessor​(FiberFactory ff,
                                                                                       int bufferSize,
                                                                                       Channels.OverflowPolicy policy,
                                                                                       SuspendableAction2<? extends ReceivePort<? super T>,​? extends SendPort<? extends R>> transformer)
        Turns a transformer into a Publisher. The transformer will run in its own fiber.
        Type Parameters:
        T - the type of elements flowing into the transformer
        R - the type of elements flowing out of the transformer
        Parameters:
        ff - the FiberFactory to create the internal fiber(s); if null then a default factory is used.
        bufferSize - the size of the buffer of the internal channel; may be -1 for unbounded, but may not be 0)
        policy - the Channels.OverflowPolicy of the internal channel.
        transformer - a function that reads from it's input channel and writes to its output channel
        Returns:
        a Processor running the given transformer.
      • toProcessor

        public static <T,​R> org.reactivestreams.Processor<T,​R> toProcessor​(int bufferSize,
                                                                                       Channels.OverflowPolicy policy,
                                                                                       SuspendableAction2<? extends ReceivePort<? super T>,​? extends SendPort<? extends R>> transformer)
        Turns a transformer into a Publisher. The transformer will run in its own fiber.

        Same as calling toProcessor(null, bufferSize, policy, transformer)

        Type Parameters:
        T - the type of elements flowing into the transformer
        R - the type of elements flowing out of the transformer
        Parameters:
        bufferSize - the size of the buffer of the internal channel; may be -1 for unbounded, but may not be 0)
        policy - the Channels.OverflowPolicy of the internal channel.
        transformer - a function that reads from it's input channel and writes to its output channel
        Returns:
        a Processor running the given transformer.