Allow for standard SQS URLs
Both now work: https://sqs.us-east-1.amazonaws.com/349840735605/TestTile38Queue sqs://us-east-1:349840735605/TestTile38Queue
This commit is contained in:
parent
ec57aaee1a
commit
5335aec942
@ -90,6 +90,7 @@ type Endpoint struct {
|
||||
KeyFile string
|
||||
}
|
||||
SQS struct {
|
||||
PlainURL string
|
||||
QueueID string
|
||||
Region string
|
||||
CredPath string
|
||||
@ -217,7 +218,12 @@ func parseEndpoint(s string) (Endpoint, error) {
|
||||
case strings.HasPrefix(s, "http:"):
|
||||
endpoint.Protocol = HTTP
|
||||
case strings.HasPrefix(s, "https:"):
|
||||
if probeSQS(s) {
|
||||
endpoint.SQS.PlainURL = s
|
||||
endpoint.Protocol = SQS
|
||||
} else {
|
||||
endpoint.Protocol = HTTP
|
||||
}
|
||||
case strings.HasPrefix(s, "disque:"):
|
||||
endpoint.Protocol = Disque
|
||||
case strings.HasPrefix(s, "grpc:"):
|
||||
@ -469,6 +475,7 @@ func parseEndpoint(s string) (Endpoint, error) {
|
||||
// credpath - path where aws credentials are located
|
||||
// credprofile - credential profile
|
||||
if endpoint.Protocol == SQS {
|
||||
if endpoint.SQS.PlainURL == "" {
|
||||
// Parsing connection from URL string
|
||||
hp := strings.Split(s, ":")
|
||||
switch len(hp) {
|
||||
@ -487,6 +494,11 @@ func parseEndpoint(s string) (Endpoint, error) {
|
||||
return endpoint, errors.New("invalid SQS queue name")
|
||||
}
|
||||
}
|
||||
// Throw error if we not provide any queue name
|
||||
if endpoint.SQS.QueueName == "" {
|
||||
return endpoint, errors.New("missing SQS queue name")
|
||||
}
|
||||
}
|
||||
|
||||
// Parsing additional params
|
||||
if len(sqp) > 1 {
|
||||
@ -512,10 +524,6 @@ func parseEndpoint(s string) (Endpoint, error) {
|
||||
}
|
||||
}
|
||||
}
|
||||
// Throw error if we not provide any queue name
|
||||
if endpoint.SQS.QueueName == "" {
|
||||
return endpoint, errors.New("missing SQS queue name")
|
||||
}
|
||||
}
|
||||
|
||||
// Basic AMQP connection strings in HOOKS interface
|
||||
|
@ -3,6 +3,7 @@ package endpoint
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -31,7 +32,11 @@ type SQSConn struct {
|
||||
}
|
||||
|
||||
func (conn *SQSConn) generateSQSURL() string {
|
||||
return "https://sqs." + conn.ep.SQS.Region + "amazonaws.com/" + conn.ep.SQS.QueueID + "/" + conn.ep.SQS.QueueName
|
||||
if conn.ep.SQS.PlainURL != "" {
|
||||
return conn.ep.SQS.PlainURL
|
||||
}
|
||||
return "https://sqs." + conn.ep.SQS.Region + ".amazonaws.com/" +
|
||||
conn.ep.SQS.QueueID + "/" + conn.ep.SQS.QueueName
|
||||
}
|
||||
|
||||
// Expired returns true if the connection has expired
|
||||
@ -74,8 +79,14 @@ func (conn *SQSConn) Send(msg string) error {
|
||||
}
|
||||
creds = credentials.NewSharedCredentials(credPath, credProfile)
|
||||
}
|
||||
var region string
|
||||
if conn.ep.SQS.Region != "" {
|
||||
region = conn.ep.SQS.Region
|
||||
} else {
|
||||
region = sqsRegionFromPlainURL(conn.ep.SQS.PlainURL)
|
||||
}
|
||||
sess := session.Must(session.NewSession(&aws.Config{
|
||||
Region: aws.String(conn.ep.SQS.Region),
|
||||
Region: ®ion,
|
||||
Credentials: creds,
|
||||
MaxRetries: aws.Int(5),
|
||||
}))
|
||||
@ -114,3 +125,20 @@ func newSQSConn(ep Endpoint) *SQSConn {
|
||||
t: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
func probeSQS(s string) bool {
|
||||
// https://sqs.eu-central-1.amazonaws.com/123456789/myqueue
|
||||
return strings.HasPrefix(s, "https://sqs.") &&
|
||||
strings.Contains(s, ".amazonaws.com/")
|
||||
}
|
||||
|
||||
func sqsRegionFromPlainURL(s string) string {
|
||||
parts := strings.Split(s, "https://sqs.")
|
||||
if len(parts) > 1 {
|
||||
parts = strings.Split(parts[1], ".amazonaws.com/")
|
||||
if len(parts) > 1 {
|
||||
return parts[0]
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user