优先队列
优先队列基于堆(Heap),是一种通过优先级进行调度的策略。在输入元素时,将通过Comparable实现优先级的对比,进而实现按照优先级“插队”的调度,举个例子
- Picasso在加载图片时内部的线程池使用了优先队列进行线程的调度,进而实现多个不同图片优先级的显示(这里通过优先队列实现了OkHttp的网络请求)。更广泛地说,只要涉及到调度策略,肯定要与优先队列有关系。
- MQ中间件: 在后台的订单派发、抢购活动中,基本上都涉及到MQ的优先级
- 同样在笔试中,用于解决TopK问题
为了简化目标,本文仅分析JDK(基于1.8.0_101)下 PriorityQueue 的入队与出队操作。
什么是优先队列?
优先队列(PriorityQueue)由堆(heap)构成,即一个完全二叉树,在本文中,子(child)节点总是小于父(parent)节点。
2
3 5
5 7 6
`(3 `(3 5 7) `(5 6))
在JDK实现的PriorityQueue中,当输入元素时,首先将元素放到最下层的叶节点(数组的末尾),接着从叶节点向上爬,并取代比它小的元素;当移除元素时,首先移除最高的根节点,然后下面向上填坑,由下面的小元素将它取而代之。
“堆”密度大的沉在下面,密度小的飘到上面。
HEAP与BST的区别
假设这是一个树``(p l r)`
- BST需要保证 l < p < r
- 而Heap需要保证 p < (or l r),同级的排序不涉及,因此设计与插入维护更加简单
入队操作
入队是最常见的操作,将数据放到堆的末端叶子,一般用于生产者,比如将一个含优先级的请求放入线程池,必须实现Comparable或者Comparator接口。源代码如下
//添加元素到队尾
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
modCount++;
int i = size;
//如果需要扩容
if (i >= queue.length)
// 通过System.arraycopy 扩容加一
grow(i + 1);
size = i + 1;
if (i == 0)
queue[0] = e;
els
siftUp(i, e);
return true;
}
在siftUp中代码如下,通过此处可以看出,JDK中默认采用的是小堆(Heaps where the parent key is less than the child keys are called min-heaps),也就是堆顶(queue[0])是数组中最小的值,接着调用了 siftUp 方法进行“升迁”排序,具体实现如下
//如果小于Parent,就替换掉它
//k: queue.size();
//E: Data to insert
private void siftUpComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>) x;
// 首与parent对比,如果比它小或等,就替换掉。
while (k > 0) {
int parent = (k - 1) >>> 1;//也就是 除以2
Object e = queue[parent];
if (key.compareTo((E) e) >= 0)
break;
queue[k] = e;
k = parent;
}
queue[k] = key;
}
举个例子,比如我要依次插入[6,3,7,9,2,1]
,Scheme代码如下
(offer () 6) = (6 nil nil)
(offer (6) 3) = (3 6 nil)
(offer (3 6) 7) = (3 6 7)
(offer (3 6 7) 9) = (3 (2 9 nil) 7)
(offer (3 (6 9 nil) 7) 2) = (2 (3 9 6) 7)
(offer (2 (3 9 6) 7) 1) = (1 (3 9 6) (2 7 nil))
从上面的例子可以看出,第一个元素,也就是根节点总是最小的值,这个sitfup
也就是插队的流程。
这里也可以看出,优先队列内部并不是全量依次排序的
提取元素
此场景在消费者中经常使用,从堆顶中拿出最小的元素,空缺位置从下向上依此提拔。
public E poll() {
if (size == 0)
return null;
int s = --size;
modCount++;
//队首最小的元素
E result = queue[0];
//队尾元素
E x = queue[s];
queue[s] = null;
if (s != 0)
//开始内部替换,并取而代之queue[0]
siftDown(0, x);
return result;
}
具体套路是这样的
//k=0 x=leaf,
private void siftDownComparable(int k, E x) {
int half = size >>> 1; // loop while a non-leaf
//不断向下
while (k < half) {
// getSmallerChild start
int child = (k << 1) + 1; // assume left child is least
E c = queue[child];
int right = child + 1;
if (right < size && c.compareTo(queue[right]) > 0){
c = queue[child = right];
}
// getSmallerChild end
if (x.compareTo(c) <= 0)
break;
queue[k] = c;
k = child;
}
queue[k] = x;
}
总之,这个与公司的职级晋升很像。
优先队列的迭代
优先队列的默认实现是数组从左到右边依此输出,等价于树(从左到右的)广度遍历BFS,但是由于左右叶子没有规范大小,因此输出是毫无意义的。唯一有意义的就是最小值queue[0]
,因此如果需要排序结果,那么只能用循环实现,而且还只能从小到大 (或者像下面这样从0插入)。
//descending order
public <T, R> List<R> itrQueueDesc(Queue<T> queue, Function<T, R> function) {
List<R> list = new ArrayList<>(); //array runs faster than the link in LeetCode, haha
while (!queue.isEmpty()) {
T poll = queue.poll();
list.add(0, function.apply(poll));
}
return list;
}
toString 也是同理,没有输出意义
OJ的TopK例子
维护一个固定长度为K的队列,时间复杂度为logN,占用内存为N
//215. Kth Largest Element in an Array https://leetcode.com/problems/kth-largest-element-in-an-array/
public <T> Queue<T> fixedQueue(Collection<T> ts, int k, Comparator<T> comparator) {
Queue<T> queue = new PriorityQueue<>(k, comparator);
for (T t : ts) {
// 此处有判断size再插入的方法,但是性能差距不大
queue.add(t);
if (queue.size() > k) {
queue.poll();
}
}
return queue;
}
OJ的WordFreq问题
词频统计(CountBy),先GroupBy,后TopN,注意这个与Elastic的InvertedIndex是没有关系的
// 688. https://leetcode.com/problems/top-k-frequent-words
public List<String> topKFrequent(String[] words, int k) {
// count by
// 此部分在Eureka的一致性Hash中也有出现,用于校验增量更新,比如UP_3_DOWN_2
Map<String, Long> collect = new HashMap<>();
for (String w : words) {
collect.put(w, collect.getOrDefault(w, 0l) + 1);
}
Queue<Map.Entry<String, Long>> queue = fixedQueue(collect.entrySet(), k,
(newVal, oldVal) -> {
// from highest to lowest
int i = newVal.getValue().compareTo(oldVal.getValue());
if (i == 0) {
return oldVal.getKey().compareTo(newVal.getKey()); //lower alphabetical comes first
}
return i;
});
return itrQueueDesc(queue, Map.Entry::getKey);
}
此外还有基于Redis的ZSET的实现