Test: Tcl client initial support for automatic reconnection.
This commit is contained in:
parent
9162b5f8f9
commit
670d164fbe
@ -31,8 +31,10 @@ package provide redis 0.1
|
||||
namespace eval redis {}
|
||||
set ::redis::id 0
|
||||
array set ::redis::fd {}
|
||||
array set ::redis::addr {}
|
||||
array set ::redis::blocking {}
|
||||
array set ::redis::deferred {}
|
||||
array set ::redis::reconnect {}
|
||||
array set ::redis::callback {}
|
||||
array set ::redis::state {} ;# State in non-blocking reply reading
|
||||
array set ::redis::statestack {} ;# Stack of states, for nested mbulks
|
||||
@ -42,35 +44,63 @@ proc redis {{server 127.0.0.1} {port 6379} {defer 0}} {
|
||||
fconfigure $fd -translation binary
|
||||
set id [incr ::redis::id]
|
||||
set ::redis::fd($id) $fd
|
||||
set ::redis::addr($id) [list $server $port]
|
||||
set ::redis::blocking($id) 1
|
||||
set ::redis::deferred($id) $defer
|
||||
set ::redis::reconnect($id) 0
|
||||
::redis::redis_reset_state $id
|
||||
interp alias {} ::redis::redisHandle$id {} ::redis::__dispatch__ $id
|
||||
}
|
||||
|
||||
# This is a wrapper to the actual dispatching procedure that handles
|
||||
# reconnection if needed.
|
||||
proc ::redis::__dispatch__ {id method args} {
|
||||
set errorcode [catch {::redis::__dispatch__raw__ $id $method $args} retval]
|
||||
if {$errorcode && $::redis::reconnect($id) && $::redis::fd($id) eq {}} {
|
||||
# Try again if the connection was lost.
|
||||
# FIXME: we don't re-select the previously selected DB, nor we check
|
||||
# if we are inside a transaction that needs to be re-issued from
|
||||
# scratch.
|
||||
set errorcode [catch {::redis::__dispatch__raw__ $id $method $args} retval]
|
||||
}
|
||||
return -code $errorcode $retval
|
||||
}
|
||||
|
||||
proc ::redis::__dispatch__raw__ {id method argv} {
|
||||
set fd $::redis::fd($id)
|
||||
|
||||
# Reconnect the link if needed.
|
||||
if {$fd eq {}} {
|
||||
lassign $::redis::addr($id) host port
|
||||
set ::redis::fd($id) [socket $host $port]
|
||||
fconfigure $::redis::fd($id) -translation binary
|
||||
set fd $::redis::fd($id)
|
||||
}
|
||||
|
||||
set blocking $::redis::blocking($id)
|
||||
set deferred $::redis::deferred($id)
|
||||
if {$blocking == 0} {
|
||||
if {[llength $args] == 0} {
|
||||
if {[llength $argv] == 0} {
|
||||
error "Please provide a callback in non-blocking mode"
|
||||
}
|
||||
set callback [lindex $args end]
|
||||
set args [lrange $args 0 end-1]
|
||||
set callback [lindex $argv end]
|
||||
set argv [lrange $argv 0 end-1]
|
||||
}
|
||||
if {[info command ::redis::__method__$method] eq {}} {
|
||||
set cmd "*[expr {[llength $args]+1}]\r\n"
|
||||
set cmd "*[expr {[llength $argv]+1}]\r\n"
|
||||
append cmd "$[string length $method]\r\n$method\r\n"
|
||||
foreach a $args {
|
||||
foreach a $argv {
|
||||
append cmd "$[string length $a]\r\n$a\r\n"
|
||||
}
|
||||
::redis::redis_write $fd $cmd
|
||||
flush $fd
|
||||
if {[catch {flush $fd}]} {
|
||||
set ::redis::fd($id) {}
|
||||
return -code error "I/O error reading reply"
|
||||
}
|
||||
|
||||
if {!$deferred} {
|
||||
if {$blocking} {
|
||||
::redis::redis_read_reply $fd
|
||||
::redis::redis_read_reply $id $fd
|
||||
} else {
|
||||
# Every well formed reply read will pop an element from this
|
||||
# list and use it as a callback. So pipelining is supported
|
||||
@ -80,7 +110,7 @@ proc ::redis::__dispatch__ {id method args} {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
uplevel 1 [list ::redis::__method__$method $id $fd] $args
|
||||
uplevel 1 [list ::redis::__method__$method $id $fd] $argv
|
||||
}
|
||||
}
|
||||
|
||||
@ -89,8 +119,12 @@ proc ::redis::__method__blocking {id fd val} {
|
||||
fconfigure $fd -blocking $val
|
||||
}
|
||||
|
||||
proc ::redis::__method__reconnect {id fd val} {
|
||||
set ::redis::reconnect($id) $val
|
||||
}
|
||||
|
||||
proc ::redis::__method__read {id fd} {
|
||||
::redis::redis_read_reply $fd
|
||||
::redis::redis_read_reply $id $fd
|
||||
}
|
||||
|
||||
proc ::redis::__method__write {id fd buf} {
|
||||
@ -104,8 +138,10 @@ proc ::redis::__method__flush {id fd} {
|
||||
proc ::redis::__method__close {id fd} {
|
||||
catch {close $fd}
|
||||
catch {unset ::redis::fd($id)}
|
||||
catch {unset ::redis::addr($id)}
|
||||
catch {unset ::redis::blocking($id)}
|
||||
catch {unset ::redis::deferred($id)}
|
||||
catch {unset ::redis::reconnect($id)}
|
||||
catch {unset ::redis::state($id)}
|
||||
catch {unset ::redis::statestack($id)}
|
||||
catch {unset ::redis::callback($id)}
|
||||
@ -143,14 +179,14 @@ proc ::redis::redis_bulk_read {fd} {
|
||||
return $buf
|
||||
}
|
||||
|
||||
proc ::redis::redis_multi_bulk_read fd {
|
||||
proc ::redis::redis_multi_bulk_read {id fd} {
|
||||
set count [redis_read_line $fd]
|
||||
if {$count == -1} return {}
|
||||
set l {}
|
||||
set err {}
|
||||
for {set i 0} {$i < $count} {incr i} {
|
||||
if {[catch {
|
||||
lappend l [redis_read_reply $fd]
|
||||
lappend l [redis_read_reply $id $fd]
|
||||
} e] && $err eq {}} {
|
||||
set err $e
|
||||
}
|
||||
@ -163,16 +199,19 @@ proc ::redis::redis_read_line fd {
|
||||
string trim [gets $fd]
|
||||
}
|
||||
|
||||
proc ::redis::redis_read_reply fd {
|
||||
proc ::redis::redis_read_reply {id fd} {
|
||||
set type [read $fd 1]
|
||||
switch -exact -- $type {
|
||||
: -
|
||||
+ {redis_read_line $fd}
|
||||
- {return -code error [redis_read_line $fd]}
|
||||
$ {redis_bulk_read $fd}
|
||||
* {redis_multi_bulk_read $fd}
|
||||
* {redis_multi_bulk_read $id $fd}
|
||||
default {
|
||||
if {$type eq {}} {return -code error "I/O error reading reply"}
|
||||
if {$type eq {}} {
|
||||
set ::redis::fd($id) {}
|
||||
return -code error "I/O error reading reply"
|
||||
}
|
||||
return -code error "Bad protocol, '$type' as reply type byte"
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user