Sorted Sets

Delayed Tasks

# See: https://redislabs.com/ebook/redis-in-action/part-2-core-concepts-2/chapter-6-application-components-in-redis/6-2-distributed-locking/6-2-3-building-a-lock-in-redis
def acquire_lock(conn, lockname, acquire_timeout=10):
    identifier = str(uuid.uuid4())

    end = time.time() + acquire_timeout
    while time.time() < end:
        if conn.setnx('lock:' + lockname, identifier):
            return identifier

        time.sleep(.001)

    return False


def release_lock(conn, lockname, identifier):
    pipe = conn.pipeline(True)
    lockname = 'lock:' + lockname

    while True:
        try:
            pipe.watch(lockname)
            if pipe.get(lockname) == identifier:
                pipe.multi()
                pipe.delete(lockname)
                pipe.execute()
                return True

            pipe.unwatch()
            break
        except redis.exceptions.WatchError:
            pass

    return False


# See: https://redislabs.com/ebook/redis-in-action/part-2-core-concepts-2/chapter-6-application-components-in-redis/6-4-task-queues/6-4-2-delayed-tasks
def execute_later(conn, queue, name, args, delay=0):
    identifier = str(uuid.uuid4())

    item = json.dumps([identifier, queue, name, args])

    if delay > 0:
        conn.zadd('delayed:', time.time() + delay, item)
    else:
        conn.rpush('queue:' + queue, item)

    return identifier


def poll_queue(conn):
    while True:
        item = conn.zrange('delayed:', 0, 0, withscores=True)

        if not item or item[0][1] > time.time():
            time.sleep(.01)
            continue

        item = item[0][0]
        identifier, queue, name, args = json.loads(item)

        locked = acquire_lock(conn, identifier)
        if not locked:
            continue

        # Special for XXX
        if queue == 'xxx':
            print 'XXX: {0}'.format(name)
            # xxx_fun(name)

        if conn.zrem('delayed:', item):
            conn.rpush('queue:' + queue, item)

        release_lock(conn, identifier, locked)

Sorted By Timestamp When Score Equals

def timestamps(self, desc=False):
    stamp = int(time.time() * 1000)
    return self.max_timestamp - stamp if desc else stamp


def rawscore(self, score):
    if not score:
        return 0.0
    return float(int(float(score) / self.rate))


def stampscore(self, score, desc=False):
    return score * self.rate + self.timestamps(desc)


def zaddwithstamps(self, conn, name, *args, **kwargs):
    desc = 'desc' in kwargs and kwargs.pop('desc')
    pieces = [item if index % 2 else self.stampscore(item, desc) for index, item in enumerate(args)]
    for pair in iteritems(kwargs):
        pieces.append(self.timestamps(pair[1], desc))
        pieces.append(pair[0])
        return conn.zadd(name, *pieces)


def zincrbywithstamps(self, conn, name, value, amount=1, desc=False):
    return conn.zadd(name, self.stampscore(self.rawscore(conn.zscore(name, value)) + amount, desc), value)
  • Usage

    In [10]: from redis_extensions import zaddwithstamps, zincrbywithstamps, rawscore
    
    In [11]: zaddwithstamps(r, 'sorted_set', 1, 'a')
    Out[11]: 1
    
    In [12]: zincrbywithstamps(r, 'sorted_set', 'b')
    Out[12]: 1
    
    In [13]: r.zrange('sorted_set', 0, 100, withscores=True)
    Out[13]: [('a', 11472180607330.0), ('b', 11472180610715.0)]
    
    In [14]: r.zrange('sorted_set', 0, 100, withscores=True, score_cast_func=rawscore)
    Out[14]: [('a', 1.0), ('b', 1.0)]
    
    In [15]: zincrbywithstamps(r, 'sorted_set', 'b')
    Out[15]: 0
    
    In [16]: zincrbywithstamps(r, 'sorted_set', 'a')
    Out[16]: 0
    
    In [17]: r.zrange('sorted_set', 0, 100, withscores=True, score_cast_func=rawscore)
    Out[17]: [('b', 2.0), ('a', 2.0)]
    

References

[1] redisclub/redis-extensions-py@Github, Redis-extensions is a collection of custom extensions for Redis-py.

results matching ""

    No results matching ""