Class CommitProcessor

All Implemented Interfaces:
Runnable, RequestProcessor

public class CommitProcessor extends ZooKeeperCriticalThread implements RequestProcessor
This RequestProcessor matches the incoming committed requests with the locally submitted requests. The trick is that locally submitted requests that change the state of the system will come back as incoming committed requests, so we need to match them up. Instead of just waiting for the committed requests, we process the uncommitted requests that belong to other sessions. The CommitProcessor is multi-threaded. Communication between threads is handled via queues, atomics, and wait/notifyAll synchronized on the processor. The CommitProcessor acts as a gateway for allowing requests to continue with the remainder of the processing pipeline. It will allow many read requests but only a single write request to be in flight simultaneously, thus ensuring that write requests are processed in transaction id order. - 1 commit processor main thread, which watches the request queues and assigns requests to worker threads based on their sessionId so that read and write requests for a particular session are always assigned to the same thread (and hence are guaranteed to run in order). - 0-N worker threads, which run the rest of the request processor pipeline on the requests. If configured with 0 worker threads, the primary commit processor thread runs the pipeline directly. Typical (default) thread counts are: on a 32 core machine, 1 commit processor thread and 32 worker threads. Multi-threading constraints: - Each session's requests must be processed in order. - Write requests must be processed in zxid order - Must ensure no race condition between writes in one session that would trigger a watch being set by a read request in another session The current implementation solves the third constraint by simply allowing no read requests to be processed in parallel with write requests.
  • Field Details

    • ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS

      public static final String ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS
      Default: numCores
      See Also:
    • ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT

      public static final String ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT
      Default worker pool shutdown timeout in ms: 5000 (5s)
      See Also:
    • ZOOKEEPER_COMMIT_PROC_MAX_READ_BATCH_SIZE

      public static final String ZOOKEEPER_COMMIT_PROC_MAX_READ_BATCH_SIZE
      Default max read batch size: -1 to disable the feature
      See Also:
    • ZOOKEEPER_COMMIT_PROC_MAX_COMMIT_BATCH_SIZE

      public static final String ZOOKEEPER_COMMIT_PROC_MAX_COMMIT_BATCH_SIZE
      Default max commit batch size: 1
      See Also:
    • queuedRequests

      protected LinkedBlockingQueue<Request> queuedRequests
      Incoming requests.
    • queuedWriteRequests

      protected final LinkedBlockingQueue<Request> queuedWriteRequests
      Incoming requests that are waiting on a commit, contained in order of arrival
    • committedRequests

      protected final LinkedBlockingQueue<Request> committedRequests
      Requests that have been committed.
    • pendingRequests

      protected final Map<Long,Deque<Request>> pendingRequests
      Requests that we are holding until commit comes in. Keys represent session ids, each value is a linked list of the session's requests.
    • numRequestsProcessing

      protected final AtomicInteger numRequestsProcessing
      The number of requests currently being processed
    • stoppedMainLoop

      protected volatile boolean stoppedMainLoop
      For testing purposes, we use a separated stopping condition for the outer loop.
    • stopped

      protected volatile boolean stopped
    • workerPool

      protected WorkerService workerPool
  • Constructor Details

  • Method Details

    • needCommit

      protected boolean needCommit(Request request)
    • run

      public void run()
      Specified by:
      run in interface Runnable
      Overrides:
      run in class Thread
    • endOfIteration

      protected void endOfIteration()
    • waitForEmptyPool

      protected void waitForEmptyPool() throws InterruptedException
      Throws:
      InterruptedException
    • start

      public void start()
      Overrides:
      start in class Thread
    • getMaxReadBatchSize

      public static int getMaxReadBatchSize()
    • getMaxCommitBatchSize

      public static int getMaxCommitBatchSize()
    • setMaxReadBatchSize

      public static void setMaxReadBatchSize(int size)
    • setMaxCommitBatchSize

      public static void setMaxCommitBatchSize(int size)
    • commit

      public void commit(Request request)
    • processRequest

      public void processRequest(Request request)
      Specified by:
      processRequest in interface RequestProcessor
    • shutdown

      public void shutdown()
      Specified by:
      shutdown in interface RequestProcessor