В RabbitMQ используется один и тот же обменник, очередь, routekey. После прослушивания очереди сообщения маршрутизируются через routePath для разделения логики задач. Одно сообщение в очереди может реагировать на различные события в зависимости от настроенного маршрута. Поддерживается различная обработка маршрутов для разных очередей на основе пула соединений RabbitMQ.
Совместимость с пулом соединений RabbitMQ и пользовательскими VirtualHosts.
Продукт уже используется в производстве, обрабатывает более 100 миллионов данных в день, способен обрабатывать более 1000 очередей сообщений в секунду. Кластер RabbitMQ и сервисные приложения не испытывают высокой нагрузки.
На основе инструмента пула подключений RabbitMQ =>
go get -u gitee.com/tym_hmm/rabbitmq-pool-router-path-go
var wg sync.WaitGroup
exchangeName := "test-data-center-exchange-name"
exchangeType := RabbitmqPool.EXCHANGE_TYPE_TOPIC
queueName := "test-data-center-queue-name"
routeKey := "test-data"
routePath := "/test/a"
wg.Add(2)
go func() {
defer wg.Done()
data:="这是一个数据test/a"
//Использование по умолчанию VirtualHosts для отправки
//product:=RabbitmqRoute.NewProductClient(host, port, user, pwd)
//Использование пользовательских VirtualHosts для отправки
product:=RabbitmqRoute.NewProductClientVirtualHosts(host, port, user, pwd, "/temptest1")
err:=product.Publish(exchangeName, exchangeType, queueName, routeKey, routePath, data)
if err !=nil{
fmt.Println(err)
}
}()
go func() {
defer wg.Done()
routePatsh := "/testaa"
datas:="这是一个数据test"
//Использование по умолчанию VirtualHosts для отправки
//products:=RabbitmqRoute.NewProductClient(host, port, user, pwd)
//Использование пользовательских VirtualHosts для отправки
products:=RabbitmqRoute.NewProductClientVirtualHosts(host, port, user, pwd, "/temptest1")
errs:=products.Publish(exchangeName, exchangeType, queueName, routeKey, routePatsh, datas)
if errs !=nil{
fmt.Println(errs)
}
}()
wg.Wait()
consumer := RabbitmqRoute.NewTask(host, port, user, pwd)
//Установка информации о загружаемом узле (временно поддерживает только присоединение одного узла)
consumer.SetHandleNode(&RabbitmqRoute.NodeInfo{
NodeName: "dataCenter",
ExchangeName: "test-data-center-exchange-name",
ExchangeType: RabbitmqPool.EXCHANGE_TYPE_TOPIC,
Route: "test-data",
QueueName: "test-data-center-queue-name",
IsTry: false,
MaxReTry: 1,
})
//Регистрация загруженного маршрута
consumer.RouteRegister(func(engine *RabbitmqRoute.TaskEngine) {
//fmt.Println(engine)
engine.AddRoute("/test", func(c *RabbitmqRoute.TaskContext) {
fmt.Println(c.Request.Data)
//c.Request.Data
})
engine.AddRoute("/test/a", func(c *RabbitmqRoute.TaskContext) {
fmt.Println(c.Request.Data)
})
})
err := consumer.Enter()
if err != nil {
fmt.Println(err)
}
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Комментарии ( 0 )