public class

BlockingConcurrentWindowCounter

extends Object
package com.netflix.astyanax.util;

import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class BlockingConcurrentWindowCounter {
    private final PriorityBlockingQueue<Integer> queue;
    private volatile int tail = 0;
    private volatile int head = 0;
    private final Semaphore semaphore;

    public BlockingConcurrentWindowCounter(int size) {
        this(size, 0);
    }

    public BlockingConcurrentWindowCounter(int size, int init) {
        this.queue = new PriorityBlockingQueue<Integer>(size);
        this.semaphore = new Semaphore(size);
        this.head = this.tail = init;
    }

    public int incrementAndGet() throws Exception {
        semaphore.acquire();
        synchronized (this) {
            return head++;
        }
    }

    public int incrementAndGet(long timeout, TimeUnit unit) throws Exception {
        semaphore.tryAcquire(timeout, unit);
        synchronized (this) {
            return head++;
        }
    }

    public synchronized int release(int index) {
        int count = 0;
        this.queue.add(index);
        while (!this.queue.isEmpty() && this.queue.peek() == tail) {
            this.queue.remove();
            tail++;
            count++;
        }
        if (count > 0)
            semaphore.release(count);
        return count;
    }
}