Язык разработки: Golang.
go get -u gitee.com/tym_hmm/rabbitmq-pool-go
Уже используется в производстве, 5200W запросов qbs 3000, соединение пула показывает отсутствие нагрузки. RabbitMQ развёрнут как кластер в сети.
Название | Описание |
---|---|
Максимальное количество TCP-соединений | 5 |
Максимальное число попыток отправки при неудаче (производитель) | 5 |
Максимальное количество каналов для каждого соединения (потребитель) | 100 (10 на каждый TCP) |
var oncePool sync.Once
var instanceRPool *kelleyRabbimqPool.RabbitPool
func initrabbitmq() *kelleyRabbimqPool.RabbitPool {
oncePool.Do(func() {
// Инициализируем производителя
instanceRPool = kelleyRabbimqPool.NewProductPool()
// Инициализируем потребителя
instanceConsumePool = kelleyRabbimqPool.NewConsumePool()
// Используем виртуальный хост "/" по умолчанию
err := instanceRPool.Connect("192.168.1.202", 5672, "guest", "guest")
// Используем пользовательский виртуальный хост
// err:=instanceConsumePool.ConnectVirtualHost("192.168.1.202", 5672, "guest", "guest", "/testHost")
if err != nil {
fmt.Println(err)
}
})
return instanceRPool
}
var wg sync.WaitGroup
for i:=0;i<100000; i++ {
wg.Add(1)
go func(num int) {
defer wg.Done()
data:=kelleyRabbimqPool.GetRabbitMqDataFormat("testChange5", kelleyRabbimqPool.EXCHANGE_TYPE_TOPIC, "textQueue5", "/", fmt.Sprintf("Здесь данные %d", num))
_=instanceRPool.Push(data)
}(i)
}
wg.Wait()
Можно определить несколько событий для потребителей, не связанных с обменом, очередью или маршрутом. Каждое событие независимо.
nomrl := &rabbitmq.ConsumeReceive{
# Определяем событие потребителя
ExchangeName: "testChange31",// Имя очереди
ExchangeType: kelleyRabbimqPool.EXCHANGE_TYPE_DIRECT,
Route: "",
QueueName: "testQueue31",
IsTry:true,// Перепробовать ли
IsAutoAck: false, // Автоматически подтверждать сообщение
MaxReTry: 5,// Максимальное количество повторных попыток
EventFail: func(code int, e error, data []byte) {
fmt.Printf("error:%s", e)
},
/***
* Параметры пояснения
* @param data []byte Полученное сообщение rabbitmq
* @param header map[string]interface{} Исходный заголовок rabbitmq
* @param retryClient RabbitmqPool.RetryClientInterface Пользовательский интерфейс для повторной попытки данных, необходимо вернуть true, чтобы предотвратить повторную отправку данных
***/
EventSuccess: func(data []byte, header map[string]interface{},retryClient kelleyRabbimqPool.RetryClientInterface)bool {// Если возвращается true, повторная попытка не требуется
_ = retryClient.Ack()// Подтвердить сообщение
fmt.Printf("data:%s\n", string(data))
return true
},
}
instanceConsumePool.RegisterConsumeReceive(nomrl)
err := instanceConsumePool.RunConsume()
if err != nil {
fmt.Println(err)
}
- Параметры объяснения
Имя | Тип | Объяснение |
---|---|---|
ExchangeName | string | Имя обмена |
ExchangeType | string | Тип обмена: EXCHANGE_TYPE_FANOUT EXCHANGE_TYPE_DIRECT EXCHANGE_TYPE_TOPIC |
Route | string | Ключ маршрута |
QueueName | string | Имя очереди |
IsTry | bool | Перепробовать ли Если включить повторные попытки, после успешного обратного вызова будет возвращено значение true для повторной отправки сообщения, время повторной попытки составляет 5000~15000 MS |
IsAutoAck | bool | Автоматически подтверждать сообщения, true: компонент автоматически подтверждает сообщения в фоновом режиме false: вручную подтвердить сообщение, успешно вызвать _ = retryClient.Ack()` для подтверждения сообщения |
MaxReTry | int | Максимальное количество повторных попыток, требуется isTry=true |
EventFail | func | Обратный вызов при ошибке |
EventSuccess | func | Успешный обратный вызов |
Коды ошибок:
- Возвращаемый код при отправке сообщения производителем.
- Код, возвращаемый при прослушивании события потребителем.
Код ошибки | Объяснение |
---|---|
501 | Отправка сообщения производителем превысила максимальное количество повторных попыток |
502 | Не удалось получить канал, обычно из-за исчерпания количества очередей |
503 | Ошибка обмена/очереди/привязки |
504 | Ошибка подключения |
506 | Ошибка создания канала |
507 | Превышено максимальное количество повторных попыток |
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Комментарии ( 0 )