Josh Baker 26d0083faf Update vendoring to use golang/dep
commit a1a37d335a8e89ac89d85c00c8585d3fc02e064a
Author: Josh Baker <joshbaker77@gmail.com>
Date:   Thu Oct 5 07:36:54 2017 -0700

    use symlink instead of copy

commit 96399c2c92620f633611c778e5473200bfd48d41
Author: Josh Baker <joshbaker77@gmail.com>
Date:   Thu Oct 5 07:19:26 2017 -0700

    use dep for vendoring
2017-10-05 07:40:19 -07:00

235 lines
5.5 KiB
Go

// Command pubsub is an example of a fanout exchange with dynamic reliable
// membership, reading from stdin, writing to stdout.
//
// This example shows how to implement reconnect logic independent from a
// publish/subscribe loop with bridges to application types.
package main
import (
"bufio"
"crypto/sha1"
"flag"
"fmt"
"io"
"log"
"os"
"github.com/streadway/amqp"
"golang.org/x/net/context"
)
var url = flag.String("url", "amqp:///", "AMQP url for both the publisher and subscriber")
// exchange binds the publishers to the subscribers
const exchange = "pubsub"
// message is the application type for a message. This can contain identity,
// or a reference to the recevier chan for further demuxing.
type message []byte
// session composes an amqp.Connection with an amqp.Channel
type session struct {
*amqp.Connection
*amqp.Channel
}
// Close tears the connection down, taking the channel with it.
func (s session) Close() error {
if s.Connection == nil {
return nil
}
return s.Connection.Close()
}
// redial continually connects to the URL, exiting the program when no longer possible
func redial(ctx context.Context, url string) chan chan session {
sessions := make(chan chan session)
go func() {
sess := make(chan session)
defer close(sessions)
for {
select {
case sessions <- sess:
case <-ctx.Done():
log.Println("shutting down session factory")
return
}
conn, err := amqp.Dial(url)
if err != nil {
log.Fatalf("cannot (re)dial: %v: %q", err, url)
}
ch, err := conn.Channel()
if err != nil {
log.Fatalf("cannot create channel: %v", err)
}
if err := ch.ExchangeDeclare(exchange, "fanout", false, true, false, false, nil); err != nil {
log.Fatalf("cannot declare fanout exchange: %v", err)
}
select {
case sess <- session{conn, ch}:
case <-ctx.Done():
log.Println("shutting down new session")
return
}
}
}()
return sessions
}
// publish publishes messages to a reconnecting session to a fanout exchange.
// It receives from the application specific source of messages.
func publish(sessions chan chan session, messages <-chan message) {
for session := range sessions {
var (
running bool
reading = messages
pending = make(chan message, 1)
confirm = make(chan amqp.Confirmation, 1)
)
pub := <-session
// publisher confirms for this channel/connection
if err := pub.Confirm(false); err != nil {
log.Printf("publisher confirms not supported")
close(confirm) // confirms not supported, simulate by always nacking
} else {
pub.NotifyPublish(confirm)
}
log.Printf("publishing...")
Publish:
for {
var body message
select {
case confirmed, ok := <-confirm:
if !ok {
break Publish
}
if !confirmed.Ack {
log.Printf("nack message %d, body: %q", confirmed.DeliveryTag, string(body))
}
reading = messages
case body = <-pending:
routingKey := "ignored for fanout exchanges, application dependent for other exchanges"
err := pub.Publish(exchange, routingKey, false, false, amqp.Publishing{
Body: body,
})
// Retry failed delivery on the next session
if err != nil {
pending <- body
pub.Close()
break Publish
}
case body, running = <-reading:
// all messages consumed
if !running {
return
}
// work on pending delivery until ack'd
pending <- body
reading = nil
}
}
}
}
// identity returns the same host/process unique string for the lifetime of
// this process so that subscriber reconnections reuse the same queue name.
func identity() string {
hostname, err := os.Hostname()
h := sha1.New()
fmt.Fprint(h, hostname)
fmt.Fprint(h, err)
fmt.Fprint(h, os.Getpid())
return fmt.Sprintf("%x", h.Sum(nil))
}
// subscribe consumes deliveries from an exclusive queue from a fanout exchange and sends to the application specific messages chan.
func subscribe(sessions chan chan session, messages chan<- message) {
queue := identity()
for session := range sessions {
sub := <-session
if _, err := sub.QueueDeclare(queue, false, true, true, false, nil); err != nil {
log.Printf("cannot consume from exclusive queue: %q, %v", queue, err)
return
}
routingKey := "application specific routing key for fancy toplogies"
if err := sub.QueueBind(queue, routingKey, exchange, false, nil); err != nil {
log.Printf("cannot consume without a binding to exchange: %q, %v", exchange, err)
return
}
deliveries, err := sub.Consume(queue, "", false, true, false, false, nil)
if err != nil {
log.Printf("cannot consume from: %q, %v", queue, err)
return
}
log.Printf("subscribed...")
for msg := range deliveries {
messages <- message(msg.Body)
sub.Ack(msg.DeliveryTag, false)
}
}
}
// read is this application's translation to the message format, scanning from
// stdin.
func read(r io.Reader) <-chan message {
lines := make(chan message)
go func() {
defer close(lines)
scan := bufio.NewScanner(r)
for scan.Scan() {
lines <- message(scan.Bytes())
}
}()
return lines
}
// write is this application's subscriber of application messages, printing to
// stdout.
func write(w io.Writer) chan<- message {
lines := make(chan message)
go func() {
for line := range lines {
fmt.Fprintln(w, string(line))
}
}()
return lines
}
func main() {
flag.Parse()
ctx, done := context.WithCancel(context.Background())
go func() {
publish(redial(ctx, *url), read(os.Stdin))
done()
}()
go func() {
subscribe(redial(ctx, *url), write(os.Stdout))
done()
}()
<-ctx.Done()
}