优先队列


优先队列基于堆(Heap),是一种通过优先级进行调度的策略。在输入元素时,将通过Comparable实现优先级的对比,进而实现按照优先级“插队”的调度,举个例子

  • Picasso在加载图片时内部的线程池使用了优先队列进行线程的调度,进而实现多个不同图片优先级的显示(这里通过优先队列实现了OkHttp的网络请求)。更广泛地说,只要涉及到调度策略,肯定要与优先队列有关系。
  • MQ中间件: 在后台的订单派发、抢购活动中,基本上都涉及到MQ的优先级
  • 同样在笔试中,用于解决TopK问题

为了简化目标,本文仅分析JDK(基于1.8.0_101)下 PriorityQueue 的入队与出队操作。

什么是优先队列?

优先队列(PriorityQueue)由堆(heap)构成,即一个完全二叉树,在本文中,子(child)节点总是小于父(parent)节点。

     2
 3      5
5 7   6

`(3 `(3 5 7) `(5 6))

在JDK实现的PriorityQueue中,当输入元素时,首先将元素放到最下层的叶节点(数组的末尾),接着从叶节点向上爬,并取代比它小的元素;当移除元素时,首先移除最高的根节点,然后下面向上填坑,由下面的小元素将它取而代之。

“堆”密度大的沉在下面,密度小的飘到上面。

HEAP与BST的区别

假设这是一个树``(p l r)`

  • BST需要保证 l < p < r
  • 而Heap需要保证 p < (or l r),同级的排序不涉及,因此设计与插入维护更加简单

入队操作

入队是最常见的操作,将数据放到堆的末端叶子,一般用于生产者,比如将一个含优先级的请求放入线程池,必须实现Comparable或者Comparator接口。源代码如下

//添加元素到队尾
public boolean offer(E e) {
  if (e == null)
    throw new NullPointerException();
  modCount++;
  int i = size;
  //如果需要扩容
  if (i >= queue.length)
    // 通过System.arraycopy 扩容加一
    grow(i + 1);
  size = i + 1;
  if (i == 0)
    queue[0] = e;
  els
    siftUp(i, e);
  return true;
}

在siftUp中代码如下,通过此处可以看出,JDK中默认采用的是小堆(Heaps where the parent key is less than the child keys are called min-heaps),也就是堆顶(queue[0])是数组中最小的值,接着调用了 siftUp 方法进行“升迁”排序,具体实现如下

//如果小于Parent,就替换掉它
//k: queue.size();
//E: Data to insert
private void siftUpComparable(int k, E x) {
  Comparable<? super E> key = (Comparable<? super E>) x;
  // 首与parent对比,如果比它小或等,就替换掉。
  while (k > 0) {
    int parent = (k - 1) >>> 1;//也就是 除以2
    Object e = queue[parent];
    if (key.compareTo((E) e) >= 0)
      break;
    queue[k] = e;
    k = parent;
  }
  queue[k] = key;
}

举个例子,比如我要依次插入[6,3,7,9,2,1],Scheme代码如下

(offer () 6) = (6 nil nil)
(offer (6) 3) = (3 6 nil)
(offer (3 6) 7) = (3 6 7)
(offer (3 6 7) 9) = (3 (2 9 nil) 7)
(offer (3 (6 9 nil) 7) 2) = (2 (3 9 6) 7)
(offer (2 (3 9 6) 7) 1) = (1 (3 9 6) (2 7 nil))

从上面的例子可以看出,第一个元素,也就是根节点总是最小的值,这个sitfup也就是插队的流程。

这里也可以看出,优先队列内部并不是全量依次排序的

提取元素

此场景在消费者中经常使用,从堆顶中拿出最小的元素,空缺位置从下向上依此提拔。

public E poll() {
  if (size == 0)
    return null;
  int s = --size;
  modCount++;
  //队首最小的元素
  E result = queue[0];
  //队尾元素
  E x = queue[s];
  queue[s] = null;
  if (s != 0)
    //开始内部替换,并取而代之queue[0]
    siftDown(0, x);
  return result;
}

具体套路是这样的

//k=0 x=leaf, 
private void siftDownComparable(int k, E x) {
  int half = size >>> 1;        // loop while a non-leaf
  //不断向下
  while (k < half) {
    // getSmallerChild start
    int child = (k << 1) + 1; // assume left child is least
    E c = queue[child];
    int right = child + 1;
    if (right < size && c.compareTo(queue[right]) > 0){
      c = queue[child = right];      
    }
    // getSmallerChild end
    if (x.compareTo(c) <= 0)
      break;
    queue[k] = c;
    k = child;
  }
  queue[k] = x;
}

总之,这个与公司的职级晋升很像。

优先队列的迭代

优先队列的默认实现是数组从左到右边依此输出,等价于树(从左到右的)广度遍历BFS,但是由于左右叶子没有规范大小,因此输出是毫无意义的。唯一有意义的就是最小值queue[0],因此如果需要排序结果,那么只能用循环实现,而且还只能从小到大 (或者像下面这样从0插入)。

//descending order
public <T, R> List<R> itrQueueDesc(Queue<T> queue, Function<T, R> function) {
  List<R> list = new ArrayList<>(); //array runs faster than the link in LeetCode, haha
  while (!queue.isEmpty()) {
    T poll = queue.poll();
    list.add(0, function.apply(poll));
  }
  return list;
}

toString 也是同理,没有输出意义

OJ的TopK例子

维护一个固定长度为K的队列,时间复杂度为logN,占用内存为N

//215. Kth Largest Element in an Array https://leetcode.com/problems/kth-largest-element-in-an-array/
public <T> Queue<T> fixedQueue(Collection<T> ts, int k, Comparator<T> comparator) {
  Queue<T> queue = new PriorityQueue<>(k, comparator);
  for (T t : ts) {
    // 此处有判断size再插入的方法,但是性能差距不大
    queue.add(t);
    if (queue.size() > k) {
      queue.poll();
    }
  }
  return queue;
}

OJ的WordFreq问题

词频统计(CountBy),先GroupBy,后TopN,注意这个与Elastic的InvertedIndex是没有关系的

// 688. https://leetcode.com/problems/top-k-frequent-words
public List<String> topKFrequent(String[] words, int k) {
  // count by 
  // 此部分在Eureka的一致性Hash中也有出现,用于校验增量更新,比如UP_3_DOWN_2
  Map<String, Long> collect = new HashMap<>();
  for (String w : words) {
    collect.put(w, collect.getOrDefault(w, 0l) + 1);
  }
  Queue<Map.Entry<String, Long>> queue = fixedQueue(collect.entrySet(), k, 
    (newVal, oldVal) -> {
      //  from highest to lowest
      int i = newVal.getValue().compareTo(oldVal.getValue());
      if (i == 0) {
        return oldVal.getKey().compareTo(newVal.getKey()); //lower alphabetical comes first
      }
      return i;
  });
  return itrQueueDesc(queue, Map.Entry::getKey);
}

此外还有基于Redis的ZSET的实现