学习到这里时,我发现我用github中官方提供的示例代码,是没有办法写入到kafka的,下面的代码也没有报错。
我加不加kq.WithFlushInterval(time.Millisecond) 这个参数都没有什么用。
我自己进行的尝试:我又用kafka-go这个包实验了一下,结果可以写入kafka,证明 我的kafka是运行正常的。
所以我不知道go-queue这个问题到底出在哪?
import (
"encoding/json"
"github.com/zeromicro/go-queue/kq"
"testing"
"time"
)
type message struct {
Key string `json:"key"`
Value string `json:"value"`
Payload string `json:"message"`
}
func TestQueue(t *testing.T) {
m := message{
Key: "1",
Value: "key1",
Payload: "value1",
}
body, err := json.Marshal(m)
if err != nil {
t.Fatal(err)
}
pusher := kq.NewPusher([]string{"127.0.0.1:9092"}, "first", kq.WithFlushInterval(time.Millisecond)) //WithFlushInterval
err = pusher.Push(string(body))
if err != nil {
t.Fatal(err)
}
time.Sleep(3 * time.Second)
}