
This commit includes updates that affects the build, testing, and deployment of Tile38. - The root level build.sh has been broken up into multiple scripts and placed in the "scripts" directory. - The vendor directory has been updated to follow the Go modules rules, thus `make` should work on isolated environments. Also some vendored packages may have been updated to a later version, if needed. - The Makefile has been updated to allow for making single binaries such as `make tile38-server`. There is some scaffolding during the build process, so from now on all binaries should be made using make. For example, to run a development version of the tile38-cli binary, do this: make tile38-cli && ./tile38-cli not this: go run cmd/tile38-cli/main.go - Travis.CI docker push script has been updated to address a change to Docker's JSON repo meta output, which in turn fixes a bug where new Tile38 versions were not being properly pushed to Docker
151 lines
3.2 KiB
Go
151 lines
3.2 KiB
Go
package sarama
|
|
|
|
type fetchRequestBlock struct {
|
|
fetchOffset int64
|
|
maxBytes int32
|
|
}
|
|
|
|
func (b *fetchRequestBlock) encode(pe packetEncoder) error {
|
|
pe.putInt64(b.fetchOffset)
|
|
pe.putInt32(b.maxBytes)
|
|
return nil
|
|
}
|
|
|
|
func (b *fetchRequestBlock) decode(pd packetDecoder) (err error) {
|
|
if b.fetchOffset, err = pd.getInt64(); err != nil {
|
|
return err
|
|
}
|
|
if b.maxBytes, err = pd.getInt32(); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// FetchRequest (API key 1) will fetch Kafka messages. Version 3 introduced the MaxBytes field. See
|
|
// https://issues.apache.org/jira/browse/KAFKA-2063 for a discussion of the issues leading up to that. The KIP is at
|
|
// https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes
|
|
type FetchRequest struct {
|
|
MaxWaitTime int32
|
|
MinBytes int32
|
|
MaxBytes int32
|
|
Version int16
|
|
blocks map[string]map[int32]*fetchRequestBlock
|
|
}
|
|
|
|
func (r *FetchRequest) encode(pe packetEncoder) (err error) {
|
|
pe.putInt32(-1) // replica ID is always -1 for clients
|
|
pe.putInt32(r.MaxWaitTime)
|
|
pe.putInt32(r.MinBytes)
|
|
if r.Version == 3 {
|
|
pe.putInt32(r.MaxBytes)
|
|
}
|
|
err = pe.putArrayLength(len(r.blocks))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for topic, blocks := range r.blocks {
|
|
err = pe.putString(topic)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = pe.putArrayLength(len(blocks))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for partition, block := range blocks {
|
|
pe.putInt32(partition)
|
|
err = block.encode(pe)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
|
|
r.Version = version
|
|
if _, err = pd.getInt32(); err != nil {
|
|
return err
|
|
}
|
|
if r.MaxWaitTime, err = pd.getInt32(); err != nil {
|
|
return err
|
|
}
|
|
if r.MinBytes, err = pd.getInt32(); err != nil {
|
|
return err
|
|
}
|
|
if r.Version == 3 {
|
|
if r.MaxBytes, err = pd.getInt32(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
topicCount, err := pd.getArrayLength()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if topicCount == 0 {
|
|
return nil
|
|
}
|
|
r.blocks = make(map[string]map[int32]*fetchRequestBlock)
|
|
for i := 0; i < topicCount; i++ {
|
|
topic, err := pd.getString()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
partitionCount, err := pd.getArrayLength()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
r.blocks[topic] = make(map[int32]*fetchRequestBlock)
|
|
for j := 0; j < partitionCount; j++ {
|
|
partition, err := pd.getInt32()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
fetchBlock := &fetchRequestBlock{}
|
|
if err = fetchBlock.decode(pd); err != nil {
|
|
return err
|
|
}
|
|
r.blocks[topic][partition] = fetchBlock
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *FetchRequest) key() int16 {
|
|
return 1
|
|
}
|
|
|
|
func (r *FetchRequest) version() int16 {
|
|
return r.Version
|
|
}
|
|
|
|
func (r *FetchRequest) requiredVersion() KafkaVersion {
|
|
switch r.Version {
|
|
case 1:
|
|
return V0_9_0_0
|
|
case 2:
|
|
return V0_10_0_0
|
|
case 3:
|
|
return V0_10_1_0
|
|
default:
|
|
return minVersion
|
|
}
|
|
}
|
|
|
|
func (r *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32) {
|
|
if r.blocks == nil {
|
|
r.blocks = make(map[string]map[int32]*fetchRequestBlock)
|
|
}
|
|
|
|
if r.blocks[topic] == nil {
|
|
r.blocks[topic] = make(map[int32]*fetchRequestBlock)
|
|
}
|
|
|
|
tmp := new(fetchRequestBlock)
|
|
tmp.maxBytes = maxBytes
|
|
tmp.fetchOffset = fetchOffset
|
|
|
|
r.blocks[topic][partitionID] = tmp
|
|
}
|