Слияние кода завершено, страница обновится автоматически
Go-queue
dq
Высокодоступный beanstalkd.
Пример потребителя:
consumer := dq.NewConsumer(dq.DqConf{
Beanstalks: []dq.Beanstalk{
{
Endpoint: "localhost:11300",
Tube: "tube",
},
{
Endpoint: "localhost:11300",
Tube: "tube",
}
},
Redis: redis.RedisConf{
Host: "localhost:6379",
Type: redis.NodeType,
}
})
consumer.Consume(func(body []byte) {
fmt.Println(string(body))
})
Пример производителя:
producer := dq.NewProducer([]dq.Beanstalk{
{
Endpoint: "localhost:11300",
Tube: "tube",
},
{
Endpoint: "localhost:11300",
Tube: "tube",
}
})
for i := 1000; i < 1005; i++ {
_, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second*5)
if err != nil {
fmt.Println(err)
}
}
kq
Kafka Pub/Sub framework.
Пример потребителя:
config.json
Name: kq
Brokers:
- 127.0.0.1:19092
- 127.0.0.1:19092
- 127.0.0.1:19092
Group: adhoc
Topic: kq
Offset: first
Consumers: 1
example code
var c kq.KqConf
conf.MustLoad("config.json", &c)
q := kq.MustNewQueue(c, kq.WithHandle(func(k, v string) error {
fmt.Printf("=> %s\n", v)
return nil
}))
defer q.Stop()
q.Start()
Пример производителя:
type message struct {
Key string `json:"key"`
Value string `json:"value"`
Payload string `json:"message"`
}
pusher := kq.NewPusher([]string{
"127.0.0.1:19092",
"127.0.0.1:19092",
"127.0.0.1:19092",
}, "kq")
ticker := time.NewTicker(time.Millisecond)
for round := 0; round < 3; round++ {
select {
case <-ticker.C:
count := rand.Intn(100)
m := message{
Key: strconv.FormatInt(time.Now().UnixNano(), 10),
Value: fmt.Sprintf("%d,%d", round, count),
Payload: fmt.Sprintf("%d,%d", round, count),
}
body, err := json.Marshal(m)
if err != nil {
log.Fatal(err)
}
fmt.Println(string(body))
if err := pusher.Push(string(body)); err != nil {
log.Fatal(err)
}
}
}
cmdline.EnterToContinue()
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Комментарии ( 0 )