请稍等 ...
×

采纳答案成功!

向帮助你的同学说点啥吧!感谢那些助人为乐的人

go-queue无法写入kafka的问题

学习到这里时,我发现我用github中官方提供的示例代码,是没有办法写入到kafka的,下面的代码也没有报错。
我加不加kq.WithFlushInterval(time.Millisecond) 这个参数都没有什么用。
我自己进行的尝试:我又用kafka-go这个包实验了一下,结果可以写入kafka,证明 我的kafka是运行正常的。

所以我不知道go-queue这个问题到底出在哪?

import (
	"encoding/json"
	"github.com/zeromicro/go-queue/kq"
	"testing"
	"time"
)

type message struct {
	Key     string `json:"key"`
	Value   string `json:"value"`
	Payload string `json:"message"`
}

func TestQueue(t *testing.T) {
	m := message{
		Key:     "1",
		Value:   "key1",
		Payload: "value1",
	}
	body, err := json.Marshal(m)
	if err != nil {
		t.Fatal(err)
	}

	pusher := kq.NewPusher([]string{"127.0.0.1:9092"}, "first", kq.WithFlushInterval(time.Millisecond)) //WithFlushInterval
	err = pusher.Push(string(body))
	if err != nil {
		t.Fatal(err)
	}

	time.Sleep(3 * time.Second)
}

正在回答 回答被采纳积分+3

2回答

木兮QwQ 2024-08-11 23:40:49

你可以用最新版本1.2.2,你的问题可能是代码与版本的使用不一致导致的,这是v1.2.2的使用示例

import (
   "context"
   "encoding/json"
   "github.com/zeromicro/go-queue/kq"
   "testing"
   "time"
)

type message struct {
   Key     string `json:"key"`
   Value   string `json:"value"`
   Payload string `json:"message"`
}

func TestQueue(t *testing.T) {
   m := message{
      Key:     "1",
      Value:   "key1",
      Payload: "value1",
   }
   body, err := json.Marshal(m)
   if err != nil {
      t.Fatal(err)
   }

   pusher := kq.NewPusher([]string{"192.168.117.24:9092"}, "msgChatTransfer", kq.WithFlushInterval(time.Millisecond)) //WithFlushInterval
   err = pusher.Push(context.Background(), string(body))
   if err != nil {
      t.Fatal(err)
   }

   time.Sleep(3 * time.Second)
}


0 回复 有任何疑惑可以回复我~
  • 提问者 404_ #1
    我最开始用的就是1.2.2版本,也是这个示例代码,写不进去
    回复 有任何疑惑可以回复我~ 2024-08-11 23:47:24
  • 提问者 404_ #2
    我在阿里云上部署的kafka,KAFKA_ADVERTISED_LISTENERS: INSIDE://192.168.117.80:9092  这个地方我没有改动。
    后来我又踩了个坑,把KAFKA_ADVERTISED_LISTENERS监听地址配置成内网IP,这样也不行,会报错:IO time out。
    只能将KAFKA_ADVERTISED_LISTENERS监听地址配置成外网IP,才能连接上,并成功写入。
    回复 有任何疑惑可以回复我~ 2024-08-12 00:20:09
  • 木兮QwQ 回复 提问者 404_ #3
    亲在课程中的配置得要根据你自己的系统而更改,课程中的配置只适合于当前课程所应用的机器,因此关于ip信息要改哦,后续课程中的其他地方也有相类似之处(„• ֊ •„)੭
    回复 有任何疑惑可以回复我~ 2024-08-12 08:22:04
木兮QwQ 2024-08-11 20:46:35
你好你使用的组件版本是多少
0 回复 有任何疑惑可以回复我~
  • 提问者 404_ #1
    github.com/zeromicro/go-queue v1.1.8
    
    kafka是2.3.0
    回复 有任何疑惑可以回复我~ 2024-08-11 22:20:52
问题已解决,确定采纳
还有疑问,暂不采纳
意见反馈 帮助中心 APP下载
官方微信