Class AutoBatchMsgProc

  • All Implemented Interfaces:
    CipReplyConsumer, CipMsgProcessor

    public class AutoBatchMsgProc
    extends MultipleReqBatch
    implements CipReplyConsumer
    Extend the functionality of MultipleReqBatch to track requests sent downstream in order to:
    • Determine when all sent requests receive a reply, in order to automatically flush any pending requests, and
    • Complete a future when all replies are received and no requests are pending.

    This permits the use of callbacks that queue more requests based on the results received. Multiple sources of additional requests will be merged together efficiently.

    The instantiator of this processor must ensure start() is called after all initial requests are queued.

    • Field Detail

      • future

        protected java.util.concurrent.CompletableFuture<?> future
      • inFlight

        protected int inFlight
    • Constructor Detail

      • AutoBatchMsgProc

        public AutoBatchMsgProc​(CipMsgProcessor cx,
                                boolean strictOrder)
        Create an automatic batching and tracking message processor, with or without strict ordering.
        Parameters:
        cx - The downstream message processor.
        strictOrder - Whether to continuing queuing small requests when oversize requests are submitted.
    • Method Detail

      • outerSend

        protected void outerSend​(CipRequest req)
        Add this processor as a callback for messages sent downstream, and add to the count of requests in flight.
        Overrides:
        outerSend in class MultipleReqBatch
        Parameters:
        req -
      • accept

        public void accept​(CipReply reply)
        Deduct from the messages in flight when replies are received, flush automatically when the count reaches zero, and complete the future if the count is still zero after flushing.
        Specified by:
        accept in interface CipReplyConsumer
        Parameters:
        reply -
      • start

        public java.util.concurrent.CompletableFuture<?> start()
        Kick off automatic flushing after ensuring a first flush.
        Returns:
        The future that completes when all submitted requests have received replies.