295 Find Median from Data Stream

Median is the middle value in an ordered integer list. If the size of the list is even, there is no middle value. So the median is the mean of the two middle value.

Examples:

[2,3,4], the median is3

[2,3], the median is(2 + 3) / 2 = 2.5

Design a data structure that supports the following two operations:

  • void addNum(int num) - Add a integer number from the data stream to the data structure.

  • double findMedian() - Return the median of all elements so far.

For example:

addNum(1)
addNum(2)
findMedian() -> 1.5
addNum(3) 
findMedian() -> 2

The mid point will follow to be the average of from of both queues when the sizes are the same, or the front of the larger queue otherwise. The biggest challenge of this problem is the maintenance of both queues. The comments in the code explain it in more detail, but what is comes down to is recognizing which queue a number belong to, and then if needed, making the necessary adjustments to maintain the same queue size,

Complexity: O(1) findMedian, O(log(n)) addNum, where n is the total number of elements inserted

from queue import PriorityQueue

class myPriorityQueue(PriorityQueue):
    def __init__(self):
        super(PriorityQueue, self).__init__()

    def front(self):
        if self.empty():
            raise IndexError('Priority Queue is empty')
        return self.queue[0]

class MaxPriorityQueue(PriorityQueue):
    def __init__(self):
        super(PriorityQueue, self).__init__()

    def put(self, item, block=True, timeout=None):
        super().put(-item)

    def get(self, block=True, timeout=None):
        return -super().get()

    def front(self):
        if self.empty():
            raise IndexError('Priority Queue is empty')
        return -self.queue[0]


class MedianFinder:
    def __init__(self):
        """
        initialize your data structure here.
        """
        self.large_half = myPriorityQueue()
        self.small_half = MaxPriorityQueue()

    def addNum(self, num):
        """
        :type num: int
        :rtype: void
        """
        if not self.small_half.empty() and not self.large_half.empty():
            # if the sizes are the same, then put the number to the group is belongs to
            if self.small_half.qsize() == self.large_half.qsize():
                # swap the elements if they are in the incorrect group
                if self.small_half.front() > self.large_half.front():
                    self.small_half.put(self.large_half.get())
                    self.large_half.put(self.small_half.get())

                if num > self.small_half.front():
                    self.large_half.put(num)
                else:
                    self.small_half.put(num)
            else:
                # if the larger half has more elements than smaller half
                if self.small_half.qsize() < self.large_half.qsize():
                    # if the number belongs in the larger half, we need to trade elements with the smaller half
                    if num > self.small_half.front():
                        self.small_half.put(self.large_half.get())
                        self.large_half.put(num)
                    else:
                        self.small_half.put(num)
                # otherwise the logic gets reversed. there are more elements in the smaller half
                else:
                    # which means that if the number belongs in the larger half, we are free to put it
                    # in to make both equal size
                    if num > self.small_half.front():
                        self.large_half.put(num)
                    # otherwise the number belong to smaller half, which means were going to have to trade elements
                    else:
                        self.large_half.put(self.small_half.get())
                        self.small_half.put(num)
        else:
            self.small_half.put(num) if self.small_half.empty() \
                else self.large_half.put(num)

    def findMedian(self):
        """
        :rtype: float
        """
        if self.large_half.qsize() > self.small_half.qsize():
            return self.large_half.front()
        elif self.large_half.qsize() == self.small_half.qsize():
            return (self.large_half.front() +
                   (self.small_half.front())) / 2
        else:
            return self.small_half.front()

Limited Testing

# TEST: ascending
obj1 = MedianFinder()
for i in range(10):
    obj1.addNum(i)
    print(obj1.findMedian())
print('')

# TEST: descending
obj2 = MedianFinder()
for i in range(10,-1,-1):
    obj2.addNum(i)
    print(obj2.findMedian())
print('')

# TEST: mix
obj3 = MedianFinder()
for l in [range(1,4), range(6,3,-1)]:
    for i in l:
        obj3.addNum(i)
        print(obj3.findMedian())
print('')

Last updated