1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/tym_hmm-rabbitmq-pool-go

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
Внести вклад в разработку кода
Синхронизировать код
Отмена
Подсказка: Поскольку Git не поддерживает пустые директории, создание директории приведёт к созданию пустого файла .keep.
Loading...
readme.md

Соединение RabbitMQ с использованием пула каналов

Язык разработки: Golang.

Зависимости

go get -u gitee.com/tym_hmm/rabbitmq-pool-go

Уже используется в производстве, 5200W запросов qbs 3000, соединение пула показывает отсутствие нагрузки. RabbitMQ развёрнут как кластер в сети.

Дальнейшие функции, ожидаемые в версии 1.0.15

  1. Увеличить пакетную обработку сообщений для повышения пропускной способности производства и потребления.

Описание функций

  1. Настроить размер пула соединений и максимальное количество обрабатываемых каналов.
  2. Автоматическое переподключение при обрыве соединения на стороне потребителя.
  3. Автоматическое переподключение при обрыве соединения на стороне производителя (v1.0.12).
  4. Повторное использование TCP с помощью циклического алгоритма.
  5. Каждому TCP соответствует один канал, чтобы предотвратить блокировку канала при записи и чрезмерное использование памяти.
  6. Поддержка различных типов обмена RabbitMQ.
  7. По умолчанию обмен, очередь и сообщения сохраняются на диске.
  8. Значения по умолчанию:
Название Описание
Максимальное количество TCP-соединений 5
Максимальное число попыток отправки при неудаче (производитель) 5
Максимальное количество каналов для каждого соединения (потребитель) 100 (10 на каждый TCP)

Использование

  1. Инициализация:
var oncePool sync.Once
var instanceRPool *kelleyRabbimqPool.RabbitPool
func initrabbitmq() *kelleyRabbimqPool.RabbitPool {
	oncePool.Do(func() {
        // Инициализируем производителя
		instanceRPool = kelleyRabbimqPool.NewProductPool()
        // Инициализируем потребителя
	    instanceConsumePool = kelleyRabbimqPool.NewConsumePool()
        // Используем виртуальный хост "/" по умолчанию
		err := instanceRPool.Connect("192.168.1.202", 5672, "guest", "guest")
        // Используем пользовательский виртуальный хост
        // err:=instanceConsumePool.ConnectVirtualHost("192.168.1.202", 5672, "guest", "guest", "/testHost")
		if err != nil {
			fmt.Println(err)
		}
	})
	return instanceRPool
}
  1. Производитель:
var wg sync.WaitGroup
	for i:=0;i<100000; i++ {
		wg.Add(1)
		go func(num int) {
			defer wg.Done()
			data:=kelleyRabbimqPool.GetRabbitMqDataFormat("testChange5", kelleyRabbimqPool.EXCHANGE_TYPE_TOPIC, "textQueue5", "/", fmt.Sprintf("Здесь данные %d", num))
			_=instanceRPool.Push(data)
		}(i)
	}
	wg.Wait()
  1. Потребитель:

Можно определить несколько событий для потребителей, не связанных с обменом, очередью или маршрутом. Каждое событие независимо.

nomrl := &rabbitmq.ConsumeReceive{
        # Определяем событие потребителя
        ExchangeName: "testChange31",// Имя очереди
        ExchangeType: kelleyRabbimqPool.EXCHANGE_TYPE_DIRECT,
        Route:        "",
        QueueName:    "testQueue31",
        IsTry:true,// Перепробовать ли
        IsAutoAck: false, // Автоматически подтверждать сообщение
        MaxReTry: 5,// Максимальное количество повторных попыток
        EventFail: func(code int, e error, data []byte) {
        	fmt.Printf("error:%s", e)
        },
        /***
         * Параметры пояснения
         * @param data []byte Полученное сообщение rabbitmq
         * @param header map[string]interface{} Исходный заголовок rabbitmq
         * @param retryClient RabbitmqPool.RetryClientInterface Пользовательский интерфейс для повторной попытки данных, необходимо вернуть true, чтобы предотвратить повторную отправку данных
         ***/
        EventSuccess: func(data []byte, header map[string]interface{},retryClient kelleyRabbimqPool.RetryClientInterface)bool {// Если возвращается true, повторная попытка не требуется
            _ = retryClient.Ack()// Подтвердить сообщение    	
            fmt.Printf("data:%s\n", string(data))
        	return true
        },
	}
	instanceConsumePool.RegisterConsumeReceive(nomrl)

	err := instanceConsumePool.RunConsume()
	if err != nil {
		fmt.Println(err)
	}
  • Параметры объяснения
Имя Тип Объяснение
ExchangeName string Имя обмена
ExchangeType string Тип обмена:
EXCHANGE_TYPE_FANOUT
EXCHANGE_TYPE_DIRECT
EXCHANGE_TYPE_TOPIC
Route string Ключ маршрута
QueueName string Имя очереди
IsTry bool Перепробовать ли
Если включить повторные попытки, после успешного обратного вызова будет возвращено значение true для повторной отправки сообщения, время повторной попытки составляет 5000~15000 MS
IsAutoAck bool Автоматически подтверждать сообщения, true: компонент автоматически подтверждает сообщения в фоновом режиме
false: вручную подтвердить сообщение, успешно вызвать _ = retryClient.Ack()` для подтверждения сообщения
MaxReTry int Максимальное количество повторных попыток, требуется isTry=true
EventFail func Обратный вызов при ошибке
EventSuccess func Успешный обратный вызов
  1. Коды ошибок

Коды ошибок:

  1. Возвращаемый код при отправке сообщения производителем.
  2. Код, возвращаемый при прослушивании события потребителем.
Код ошибки Объяснение
501 Отправка сообщения производителем превысила максимальное количество повторных попыток
502 Не удалось получить канал, обычно из-за исчерпания количества очередей
503 Ошибка обмена/очереди/привязки
504 Ошибка подключения
506 Ошибка создания канала
507 Превышено максимальное количество повторных попыток

Комментарии ( 0 )

Вы можете оставить комментарий после Вход в систему

Введение

Голанг-RabbitMQ: пул соединений и повторное использование каналов в кластере RabbitMQ. Развернуть Свернуть
GPL-2.0
Отмена

Обновления (8)

все

Участники

все

Недавние действия

Загрузить больше
Больше нет результатов для загрузки
1
https://api.gitlife.ru/oschina-mirror/tym_hmm-rabbitmq-pool-go.git
git@api.gitlife.ru:oschina-mirror/tym_hmm-rabbitmq-pool-go.git
oschina-mirror
tym_hmm-rabbitmq-pool-go
tym_hmm-rabbitmq-pool-go
master