diff --git a/src/blocked.c b/src/blocked.c
index 1d59ee16a..61fd9fa87 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -135,7 +135,9 @@ void processUnblockedClients(void) {
 /* Unblock a client calling the right function depending on the kind
  * of operation the client is blocking for. */
 void unblockClient(client *c) {
-    if (c->btype == BLOCKED_LIST || c->btype == BLOCKED_STREAM) {
+    if (c->btype == BLOCKED_LIST ||
+        c->btype == BLOCKED_ZSET ||
+        c->btype == BLOCKED_STREAM) {
         unblockClientWaitingData(c);
     } else if (c->btype == BLOCKED_WAIT) {
         unblockClientWaitingReplicas(c);
@@ -162,7 +164,9 @@ void unblockClient(client *c) {
  * send it a reply of some kind. After this function is called,
  * unblockClient() will be called with the same client as argument. */
 void replyToBlockedClientTimedOut(client *c) {
-    if (c->btype == BLOCKED_LIST || c->btype == BLOCKED_STREAM) {
+    if (c->btype == BLOCKED_LIST ||
+        c->btype == BLOCKED_ZSET ||
+        c->btype == BLOCKED_STREAM) {
         addReply(c,shared.nullmultibulk);
     } else if (c->btype == BLOCKED_WAIT) {
         addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));
@@ -244,7 +248,7 @@ void handleClientsBlockedOnKeys(void) {
                         client *receiver = clientnode->value;
 
                         if (receiver->btype != BLOCKED_LIST) {
-                            /* Put on the tail, so that at the next call
+                            /* Put at the tail, so that at the next call
                              * we'll not run into it again. */
                             listDelNode(clients,clientnode);
                             listAddNodeTail(clients,receiver);
@@ -289,6 +293,43 @@ void handleClientsBlockedOnKeys(void) {
                  * when an element was pushed on the list. */
             }
 
+            /* Serve clients blocked on sorted set key. */
+            else if (o != NULL && o->type == OBJ_ZSET) {
+                dictEntry *de;
+
+                /* We serve clients in the same order they blocked for
+                 * this key, from the first blocked to the last. */
+                de = dictFind(rl->db->blocking_keys,rl->key);
+                if (de) {
+                    list *clients = dictGetVal(de);
+                    int numclients = listLength(clients);
+
+                    while(numclients--) {
+                        listNode *clientnode = listFirst(clients);
+                        client *receiver = clientnode->value;
+
+                        if (receiver->btype != BLOCKED_ZSET) {
+                            /* Put at the tail, so that at the next call
+                             * we'll not run into it again. */
+                            listDelNode(clients,clientnode);
+                            listAddNodeTail(clients,receiver);
+                            continue;
+                        }
+
+                        int reverse =   (receiver->lastcmd &&
+                                        receiver->lastcmd->proc == bzpopCommand) ?
+                                        0 : 1;
+                        unblockClient(receiver);
+                        genericZpopCommand(receiver,&rl->key,1,reverse);
+
+                        propagate(reverse ?
+                            server.zrevpopCommand : server.zpopCommand,
+                            receiver->db->id,receiver->argv,receiver->argc,
+                            PROPAGATE_AOF|PROPAGATE_REPL);
+                    }
+                }
+            }
+
             /* Serve clients blocked on stream key. */
             else if (o != NULL && o->type == OBJ_STREAM) {
                 dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
@@ -371,8 +412,9 @@ void handleClientsBlockedOnKeys(void) {
     }
 }
 
-/* This is how the current blocking lists/streams work, we use BLPOP as
- * example, but the concept is the same for other list ops and XREAD.
+/* This is how the current blocking lists/sorted sets/streams work, we use
+ * BLPOP as example, but the concept is the same for other list ops, sorted
+ * sets and XREAD.
  * - If the user calls BLPOP and the key exists and contains a non empty list
  *   then LPOP is called instead. So BLPOP is semantically the same as LPOP
  *   if blocking is not required.
@@ -389,14 +431,14 @@ void handleClientsBlockedOnKeys(void) {
  *   to the number of elements we have in the ready list.
  */
 
-/* Set a client in blocking mode for the specified key (list or stream), with
- * the specified timeout. The 'type' argument is BLOCKED_LIST or BLOCKED_STREAM
- * depending on the kind of operation we are waiting for an empty key in
- * order to awake the client. The client is blocked for all the 'numkeys'
- * keys as in the 'keys' argument. When we block for stream keys, we also
- * provide an array of streamID structures: clients will be unblocked only
- * when items with an ID greater or equal to the specified one is appended
- * to the stream. */
+/* Set a client in blocking mode for the specified key (list, zset or stream),
+ * with the specified timeout. The 'type' argument is BLOCKED_LIST,
+ * BLOCKED_ZSET or BLOCKED_STREAM depending on the kind of operation we are
+ * waiting for an empty key in order to awake the client. The client is blocked
+ * for all the 'numkeys' keys as in the 'keys' argument. When we block for
+ * stream keys, we also provide an array of streamID structures: clients will
+ * be unblocked only when items with an ID greater or equal to the specified
+ * one is appended to the stream. */
 void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids) {
     dictEntry *de;
     list *l;
@@ -409,7 +451,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
 
     for (j = 0; j < numkeys; j++) {
         /* The value associated with the key name in the bpop.keys dictionary
-         * is NULL for lists, or the stream ID for streams. */
+         * is NULL for lists and sorted sets, or the stream ID for streams. */
         void *key_data = NULL;
         if (btype == BLOCKED_STREAM) {
             key_data = zmalloc(sizeof(streamID));
diff --git a/src/db.c b/src/db.c
index 5f733e2d8..6c039e129 100644
--- a/src/db.c
+++ b/src/db.c
@@ -169,7 +169,9 @@ void dbAdd(redisDb *db, robj *key, robj *val) {
     int retval = dictAdd(db->dict, copy, val);
 
     serverAssertWithInfo(NULL,key,retval == DICT_OK);
-    if (val->type == OBJ_LIST) signalKeyAsReady(db, key);
+    if (val->type == OBJ_LIST ||
+        val->type == OBJ_ZSET)
+        signalKeyAsReady(db, key);
     if (server.cluster_enabled) slotToKeyAdd(key);
 }
 
diff --git a/src/server.c b/src/server.c
index 404429beb..22317be71 100644
--- a/src/server.c
+++ b/src/server.c
@@ -198,6 +198,10 @@ struct redisCommand redisCommandTable[] = {
     {"zrank",zrankCommand,3,"rF",0,NULL,1,1,1,0,0},
     {"zrevrank",zrevrankCommand,3,"rF",0,NULL,1,1,1,0,0},
     {"zscan",zscanCommand,-3,"rR",0,NULL,1,1,1,0,0},
+    {"zpop",zpopCommand,-2,"wF",0,NULL,1,-1,1,0,0},
+    {"zrevpop",zrevpopCommand,-2,"wF",0,NULL,1,-1,1,0,0},
+    {"bzpop",bzpopCommand,-2,"wsF",0,NULL,1,-2,1,0,0},
+    {"bzrevpop",bzrevpopCommand,-2,"wsF",0,NULL,1,-2,1,0,0},
     {"hset",hsetCommand,-4,"wmF",0,NULL,1,1,1,0,0},
     {"hsetnx",hsetnxCommand,4,"wmF",0,NULL,1,1,1,0,0},
     {"hget",hgetCommand,3,"rF",0,NULL,1,1,1,0,0},
@@ -1369,6 +1373,8 @@ void createSharedObjects(void) {
     shared.rpop = createStringObject("RPOP",4);
     shared.lpop = createStringObject("LPOP",4);
     shared.lpush = createStringObject("LPUSH",5);
+    shared.zpop = createStringObject("ZPOP",4);
+    shared.zrevpop = createStringObject("ZREVPOP",7);
     for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
         shared.integers[j] =
             makeObjectShared(createObject(OBJ_STRING,(void*)(long)j));
@@ -1562,6 +1568,8 @@ void initServerConfig(void) {
     server.lpushCommand = lookupCommandByCString("lpush");
     server.lpopCommand = lookupCommandByCString("lpop");
     server.rpopCommand = lookupCommandByCString("rpop");
+    server.zpopCommand = lookupCommandByCString("zpop");
+    server.zrevpopCommand = lookupCommandByCString("zrevpop");
     server.sremCommand = lookupCommandByCString("srem");
     server.execCommand = lookupCommandByCString("exec");
     server.expireCommand = lookupCommandByCString("expire");
diff --git a/src/server.h b/src/server.h
index 172e99c8a..a8039dde0 100644
--- a/src/server.h
+++ b/src/server.h
@@ -258,7 +258,8 @@ typedef long long mstime_t; /* millisecond time type. */
 #define BLOCKED_WAIT 2    /* WAIT for synchronous replication. */
 #define BLOCKED_MODULE 3  /* Blocked by a loadable module. */
 #define BLOCKED_STREAM 4  /* XREAD. */
-#define BLOCKED_NUM 5     /* Number of blocked states. */
+#define BLOCKED_ZSET 5    /* BZPOP et al. */
+#define BLOCKED_NUM 6     /* Number of blocked states. */
 
 /* Client request types */
 #define PROTO_REQ_INLINE 1
@@ -646,7 +647,7 @@ typedef struct blockingState {
     mstime_t timeout;       /* Blocking operation timeout. If UNIX current time
                              * is > timeout then the operation timed out. */
 
-    /* BLOCKED_LIST and BLOCKED_STREAM */
+    /* BLOCKED_LIST, BLOCKED_ZSET and BLOCKED_STREAM */
     dict *keys;             /* The keys we are waiting to terminate a blocking
                              * operation such as BLPOP or XREAD. Or NULL. */
     robj *target;           /* The key that should receive the element,
@@ -762,7 +763,7 @@ struct sharedObjectsStruct {
     *masterdownerr, *roslaveerr, *execaborterr, *noautherr, *noreplicaserr,
     *busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk,
     *unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink,
-    *rpop, *lpop, *lpush, *emptyscan,
+    *rpop, *lpop, *lpush, *zpop, *zrevpop, *emptyscan,
     *select[PROTO_SHARED_SELECT_CMDS],
     *integers[OBJ_SHARED_INTEGERS],
     *mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
@@ -960,8 +961,8 @@ struct redisServer {
     off_t loading_process_events_interval_bytes;
     /* Fast pointers to often looked up command */
     struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand,
-                        *rpopCommand, *sremCommand, *execCommand,
-                        *expireCommand, *pexpireCommand, *xclaimCommand;
+                        *rpopCommand, *zpopCommand, *zrevpopCommand, *sremCommand,
+                        *execCommand, *expireCommand, *pexpireCommand, *xclaimCommand;
     /* Fields used only for stats */
     time_t stat_starttime;          /* Server start time */
     long long stat_numcommands;     /* Number of processed commands */
@@ -1628,6 +1629,7 @@ unsigned long zslGetRank(zskiplist *zsl, double score, sds o);
 int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore);
 long zsetRank(robj *zobj, sds ele, int reverse);
 int zsetDel(robj *zobj, sds ele);
+void genericZpopCommand(client *c, robj **keyv, int keyc, int reverse);
 sds ziplistGetObject(unsigned char *sptr);
 int zslValueGteMin(double value, zrangespec *spec);
 int zslValueLteMax(double value, zrangespec *spec);
@@ -1968,6 +1970,10 @@ void zremCommand(client *c);
 void zscoreCommand(client *c);
 void zremrangebyscoreCommand(client *c);
 void zremrangebylexCommand(client *c);
+void zpopCommand(client *c);
+void zrevpopCommand(client *c);
+void bzpopCommand(client *c);
+void bzrevpopCommand(client *c);
 void multiCommand(client *c);
 void execCommand(client *c);
 void discardCommand(client *c);
diff --git a/src/t_zset.c b/src/t_zset.c
index f7f4c6eb2..2b11c7d37 100644
--- a/src/t_zset.c
+++ b/src/t_zset.c
@@ -3068,3 +3068,147 @@ void zscanCommand(client *c) {
         checkType(c,o,OBJ_ZSET)) return;
     scanGenericCommand(c,o,cursor);
 }
+
+/* This command implements the generic zpop operation, used by:
+ * ZPOP, ZREVPOP, BZPOP and BZREVPOP */
+void genericZpopCommand(client *c, robj **keyv, int keyc, int reverse) {
+    int idx;
+    robj *key;
+    robj *zobj;
+    sds ele;
+    double score;
+    char *events[2] = {"zpop","zrevpop"};
+
+    // Check type and break on the first error, otherwise identify candidate
+    idx = 0;
+    while (idx < keyc) {
+        key = keyv[idx++];
+        zobj = lookupKeyWrite(c->db,key);
+        if (!zobj) continue;
+        if (checkType(c,zobj,OBJ_ZSET)) return;
+        break;
+    }
+
+    // No candidate for zpopping, return empty
+    if (!zobj) {
+        addReply(c,shared.emptymultibulk);
+        return;
+    }
+
+    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
+        unsigned char *zl = zobj->ptr;
+        unsigned char *eptr, *sptr;
+        unsigned char *vstr;
+        unsigned int vlen;
+        long long vlong;
+
+        // Get the first or last element in the sorted set
+        eptr = ziplistIndex(zl,reverse ? -2 : 0);
+        serverAssertWithInfo(c,zobj,eptr != NULL);
+        
+        // There must be an element in the sorted set
+        serverAssertWithInfo(c,zobj,eptr != NULL);
+        serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
+        if (vstr == NULL)
+            ele = sdsfromlonglong(vlong);
+        else
+            ele = sdsnewlen(vstr,vlen);
+
+        // Get the score
+        sptr = ziplistNext(zl,eptr);
+        serverAssertWithInfo(c,zobj,sptr != NULL);
+        score = zzlGetScore(sptr);
+    } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
+        zset *zs = zobj->ptr;
+        zskiplist *zsl = zs->zsl;
+        zskiplistNode *ln;
+
+        // Get the first or last element in the sorted set
+        ln = (reverse ? zsl->tail : zsl->header->level[0].forward);
+
+        // There must be an element in the sorted set
+        serverAssertWithInfo(c,zobj,ln != NULL);
+        ele = sdsdup(ln->ele);
+        score = ln->score;
+    } else {
+        serverPanic("Unknown sorted set encoding");
+    }
+
+    // Remove the element
+    serverAssertWithInfo(c,zobj,zsetDel(zobj,ele));
+    server.dirty++;
+    signalModifiedKey(c->db,key);
+    notifyKeyspaceEvent(NOTIFY_ZSET,events[reverse],key,c->db->id);
+
+    // Remove the key, if indeed needed
+    if (zsetLength(zobj) == 0) {
+        dbDelete(c->db,key);
+        notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
+    }
+
+    addReplyMultiBulkLen(c,3);
+    addReplyBulk(c,key);
+    addReplyDouble(c,score);
+    addReplyBulkCBuffer(c,ele,sdslen(ele));
+    sdsfree(ele);
+}
+
+// ZPOP key [key ...]
+void zpopCommand(client *c) {
+    genericZpopCommand(c,&c->argv[1],c->argc-1,0);
+}
+
+// ZREVPOP key [key ...]
+void zrevpopCommand(client *c) {
+    genericZpopCommand(c,&c->argv[1],c->argc-1,1);
+}
+
+/* Blocking Z[REV]POP */
+void blockingGenericZpopCommand(client *c, int reverse) {
+    robj *o;
+    mstime_t timeout;
+    int j;
+
+    if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
+        != C_OK) return;
+
+    for (j = 1; j < c->argc-1; j++) {
+        o = lookupKeyWrite(c->db,c->argv[j]);
+        if (o != NULL) {
+            if (o->type != OBJ_ZSET) {
+                addReply(c,shared.wrongtypeerr);
+                return;
+            } else {
+                if (zsetLength(o) != 0) {
+                    /* Non empty zset, this is like a normal Z[REV]POP. */
+                    genericZpopCommand(c,&c->argv[j],1,reverse);
+                    /* Replicate it as an Z[REV]POP instead of BZ[REV]POP. */
+                    rewriteClientCommandVector(c,2,
+                        reverse ? shared.zrevpop : shared.zpop,
+                        c->argv[j]);
+                    return;
+                }
+            }
+        }
+    }
+
+    /* If we are inside a MULTI/EXEC and the zset is empty the only thing
+     * we can do is treating it as a timeout (even with timeout 0). */
+    if (c->flags & CLIENT_MULTI) {
+        addReply(c,shared.nullmultibulk);
+        return;
+    }
+
+    /* If the keys do not exist we must block */
+    blockForKeys(c,BLOCKED_ZSET,c->argv + 1,c->argc - 2,timeout,NULL,NULL);
+}
+
+// BZPOP key [key ...] timeout
+void bzpopCommand(client *c) {
+    blockingGenericZpopCommand(c,0);
+}
+
+// BZREVPOP key [key ...] timeout
+void bzrevpopCommand(client *c) {
+    blockingGenericZpopCommand(c,1);
+}
diff --git a/tests/unit/type/zset.tcl b/tests/unit/type/zset.tcl
index 564825ae9..797045797 100644
--- a/tests/unit/type/zset.tcl
+++ b/tests/unit/type/zset.tcl
@@ -648,6 +648,79 @@ start_server {tags {"zset"}} {
                 }
             }
         }
+
+        test "Basic Z\[REV\]POP with a single key - $encoding" {
+            r del zset
+            assert_equal {} [r zpop zset]
+            create_zset zset {-1 a 1 b 2 c 3 d 4 e}
+            assert_equal {zset -1 a} [r zpop zset]
+            assert_equal {zset 1 b} [r zpop zset]
+            assert_equal {zset 4 e} [r zrevpop zset]
+            assert_equal {zset 3 d} [r zrevpop zset]
+            assert_equal {zset 2 c} [r zpop zset]
+            assert_equal 0 [r exists zset]
+            r set foo bar
+            assert_error "*WRONGTYPE*" {r zpop foo}
+        }
+
+        test "Z\[REV\]POP with multiple keys - $encoding" {
+            r del z1 z2 z3 foo
+            r set foo bar
+            assert_equal {} [r zpop z1 z2 z3]
+            assert_error "*WRONGTYPE*" {r zpop z1 foo}
+            create_zset z1 {0 a 1 b 2 c}
+            assert_equal {z1 0 a} [r zpop z1 z2 z3]
+            assert_equal {z1 1 b} [r zpop z3 z2 z1]
+            create_zset z3 {0 a 1 b 2 c}
+            assert_equal {z3 2 c} [r zrevpop z3 z2 z1]
+            assert_equal 1 [r exists z1]
+            assert_equal 1 [r exists z3]
+        }
+
+        test "BZ\[REV\]POP with a single existing sorted set - $encoding" {
+            set rd [redis_deferring_client]
+            create_zset zset {0 a 1 b 2 c}
+
+            $rd bzpop zset 5
+            assert_equal {zset 0 a} [$rd read]
+            $rd bzpop zset 5
+            assert_equal {zset 1 b} [$rd read]
+            $rd bzrevpop zset 5
+            assert_equal {zset 2 c} [$rd read]
+            assert_equal 0 [r exists zset]
+        }
+
+        test "BZ\[REV\]POP with multiple existing sorted sets - $encoding" {
+            set rd [redis_deferring_client]
+            create_zset z1 {0 a 1 b 2 c}
+            create_zset z2 {3 d 4 e 5 f}
+
+            $rd bzpop z1 z2 5
+            assert_equal {z1 0 a} [$rd read]
+            $rd bzrevpop z1 z2 5
+            assert_equal {z1 2 c} [$rd read]
+            assert_equal 1 [r zcard z1]
+            assert_equal 3 [r zcard z2]
+
+            $rd bzrevpop z2 z1 5
+            assert_equal {z2 5 f} [$rd read]
+            $rd bzpop z2 z1 5
+            assert_equal {z2 3 d} [$rd read]
+            assert_equal 1 [r zcard z1]
+            assert_equal 1 [r zcard z2]
+        }
+
+        test "BZ\[REV\]POP second sorted set has members - $encoding" {
+            set rd [redis_deferring_client]
+            r del z1
+            create_zset z2 {3 d 4 e 5 f}
+            $rd bzrevpop z1 z2 5
+            assert_equal {z2 5 f} [$rd read]
+            $rd bzpop z2 z1 5
+            assert_equal {z2 3 d} [$rd read]
+            assert_equal 0 [r zcard z1]
+            assert_equal 1 [r zcard z2]
+        }
     }
 
     basics ziplist
@@ -1025,6 +1098,91 @@ start_server {tags {"zset"}} {
             }
             assert_equal {} $err
         }
+
+        test "BZPOP, ZADD + DEL should not awake blocked client" {
+            set rd [redis_deferring_client]
+            r del zset
+
+            $rd bzpop zset 0
+            r multi
+            r zadd zset 0 foo
+            r del zset
+            r exec
+            r del zset
+            r zadd zset 1 bar
+            $rd read
+        } {zset 1 bar}
+
+        test "BZPOP, ZADD + DEL + SET should not awake blocked client" {
+            set rd [redis_deferring_client]
+            r del list
+
+            r del zset
+
+            $rd bzpop zset 0
+            r multi
+            r zadd zset 0 foo
+            r del zset
+            r set zset foo
+            r exec
+            r del zset
+            r zadd zset 1 bar
+            $rd read
+        } {zset 1 bar}
+
+        test "BZPOP with same key multiple times should work" {
+            set rd [redis_deferring_client]
+            r del z1 z2
+
+            # Data arriving after the BZPOP.
+            $rd bzpop z1 z2 z2 z1 0
+            r zadd z1 0 a
+            assert_equal [$rd read] {z1 0 a}
+            $rd bzpop z1 z2 z2 z1 0
+            r zadd z2 1 b
+            assert_equal [$rd read] {z2 1 b}
+
+            # Data already there.
+            r zadd z1 0 a
+            r zadd z2 1 b
+            $rd bzpop z1 z2 z2 z1 0
+            assert_equal [$rd read] {z1 0 a}
+            $rd bzpop z1 z2 z2 z1 0
+            assert_equal [$rd read] {z2 1 b}
+        }
+
+        test "MULTI/EXEC is isolated from the point of view of BZPOP" {
+            set rd [redis_deferring_client]
+            r del zset
+            $rd bzpop zset 0
+            r multi
+            r zadd zset 0 a
+            r zadd zset 1 b
+            r zadd zset 2 c
+            r exec
+            $rd read
+        } {zset 0 a}
+
+        test "BZPOP with variadic ZADD" {
+            set rd [redis_deferring_client]
+            r del zset
+            if {$::valgrind} {after 100}
+            $rd bzpop zset 0
+            if {$::valgrind} {after 100}
+            assert_equal 2 [r zadd zset -1 foo 1 bar]
+            if {$::valgrind} {after 100}
+            assert_equal {zset -1 foo} [$rd read]
+            assert_equal {bar} [r zrange zset 0 -1]
+        }
+
+        test "BZPOP with zero timeout should block indefinitely" {
+            set rd [redis_deferring_client]
+            r del zset
+            $rd bzpop zset 0
+            after 1000
+            r zadd zset 0 foo
+            assert_equal {zset 0 foo} [$rd read]
+        }
     }
 
     tags {"slow"} {