Слияние кода завершено, страница обновится автоматически
package RabbitmqRoute
import (
"bytes"
"encoding/json"
"fmt"
kelleyRabbimqPool "gitee.com/tym_hmm/rabbitmq-pool-go"
"strings"
"sync"
)
const (
//rabbitmq连接错误
CODE_RABBITMQ_CONNECTION_ERROR = 50100
//数据解析错误
CODE_RABBITMQ_PARSES_DATA_ERROR = 50101
//未有可处理的任务节点
CODE_RABBITMQ_NODE_INFO_ERROR = 50102
//rabbitmq 客户端错误
CODE_RABBITMQ_RECEIVED_ERROR = 51013
)
/**
当单消费节点
*/
type NodeInfo struct {
NodeName string //当前节点名称
ExchangeName string //交换机
ExchangeType string //交换机类型
Route string //路由
QueueName string //队列名称
VirtualHosts string //虚拟host
IsTry bool //是否重试
MaxReTry int32 //最大重式次数
IsAutoAck bool //是否自动确认
}
/**
异常错误
*/
type RabbitMqTaskError struct {
Code int
Message string
}
func newError(code int, message string) *RabbitMqTaskError {
return &RabbitMqTaskError{
Code: code,
Message: message,
}
}
func (dcte *RabbitMqTaskError) Error() string {
return fmt.Sprintf("Exception (%d) Reason: %s", dcte.Code, dcte.Message)
}
/**
数据中心任务
以rabbitmq注册
*/
type dataCenterTask struct {
host string
port int
user string
pwd string
virtualHosts string
maxConnection int32
maxChannel int32
consumerClient *kelleyRabbimqPool.RabbitPool
handleNode *NodeInfo
handleAop HandlerAop
routeEngine *TaskEngine
}
/**
实例化数据中心
*/
//var dataCenterOnce sync.Once
//var dataCenterT *dataCenterTask
func NewTask(host string, port int, user, pwd string) *dataCenterTask {
//dataCenterOnce.Do(func() {
dataCenterT := &dataCenterTask{}
dataCenterT.host = host
dataCenterT.port = port
dataCenterT.user = user
dataCenterT.pwd = pwd
dataCenterT.routeEngine = NewDataCenterEngine()
//})
return dataCenterT
}
/**
添加一个处理任务
*/
func (d *dataCenterTask) SetHandleNode(nodeInfo *NodeInfo) {
d.handleNode = nodeInfo
}
func (d *dataCenterTask) SetMaxConnection(maxConnection int32) {
d.maxConnection = maxConnection
}
func (d *dataCenterTask) MaxChannel(maxChannel int32) {
d.maxChannel = maxChannel
}
/**
路由注册
*/
func (d *dataCenterTask) RouteRegister(fun func(taskEngine *TaskEngine)) {
fun(d.routeEngine)
}
/**
设置路由切面
*/
func (d *dataCenterTask) SetHandleAop(handler HandlerAop) {
d.handleAop = handler
}
/**
任务入口
*/
func (d *dataCenterTask) Enter() *RabbitMqTaskError {
if d.handleNode == nil {
return newError(CODE_RABBITMQ_NODE_INFO_ERROR, "no node info handle, please call SetHandleNode()")
}
virtualHosts := "/"
if len(strings.TrimSpace(d.handleNode.VirtualHosts)) > 0 {
virtualHosts = d.handleNode.VirtualHosts
}
consumer := newDataCenterConsumer("rabbitmq-route", d, d.handleNode.NodeName, d.handleNode.ExchangeName, d.handleNode.ExchangeType, d.handleNode.QueueName, d.handleNode.Route, virtualHosts, d.handleNode.IsTry, d.handleNode.MaxReTry, d.handleNode.IsAutoAck)
if d.handleAop != nil {
d.routeEngine.handlerAop = d.handleAop
}
consumer.routeRegister(d.routeEngine)
consumer.connection(d.host, d.port, d.user, d.pwd, virtualHosts, d.maxConnection, d.maxChannel)
if consumer.consumePoolErr != nil {
return newError(CODE_RABBITMQ_CONNECTION_ERROR, fmt.Sprintf("rabbitmq connection error , %s", consumer.consumePoolErr.Error()))
}
var wg sync.WaitGroup
wg.Add(1)
go func(k int, info *NodeInfo) {
defer wg.Done()
consumer.handle()
}(0, d.handleNode)
wg.Wait()
return nil
}
/**
数据消费
*/
type dataCenterConsumer struct {
dataCenterTask *dataCenterTask
NodeName string
routeEngine *TaskEngine
BaseConsumer
consumePoolOnce sync.Once
consumeInstance *kelleyRabbimqPool.RabbitPool
consumePoolErr error
}
/**
初始化数据中心消费者
*/
func newDataCenterConsumer(taskName string, dataCenterTask *dataCenterTask, nodeName string, exChangeName, exChangeType, queueName, routeKey, virtualHosts string, isTry bool, maxRetry int32, IsAutoAck bool) *dataCenterConsumer {
return &dataCenterConsumer{BaseConsumer: struct {
TaskName string
ExchangeName string
ExChangeType string
VirtualHosts string
Queue string
RouteKey string
IsTry bool
MaxReTry int32
IsAutoAck bool
}{TaskName: taskName, ExchangeName: exChangeName, ExChangeType: exChangeType, VirtualHosts: virtualHosts, Queue: queueName, RouteKey: routeKey, IsTry: isTry, MaxReTry: maxRetry, IsAutoAck: IsAutoAck},
NodeName: nodeName,
dataCenterTask: dataCenterTask,
}
}
/**
初始化注册
*/
func (d *dataCenterConsumer) connection(host string, port int, user, pwd, virtualHosts string, maxConnection, maxChannel int32) {
d.consumePoolOnce.Do(func() {
instancePool := kelleyRabbimqPool.NewConsumePool()
if maxChannel <= 0 {
maxChannel = 25
}
if maxConnection <= 0 {
maxConnection = 5
}
instancePool.SetMaxConnection(maxConnection)
instancePool.SetMaxConsumeChannel(maxChannel)
var err error
if len(strings.TrimSpace(virtualHosts)) > 0 {
err = instancePool.ConnectVirtualHost(host, port, user, pwd, virtualHosts)
} else {
err = instancePool.Connect(host, port, user, pwd)
}
if err != nil {
d.consumePoolErr = err
d.consumeInstance = nil
} else {
d.consumePoolErr = nil
d.consumeInstance = instancePool
}
})
}
func (d *dataCenterConsumer) routeRegister(routeEngine *TaskEngine) {
d.routeEngine = routeEngine
d.routeEngine.CreateRouter()
}
/**
运行消费者监听
*/
func (d *dataCenterConsumer) handle() {
d.Receive(d.consumeInstance, func(receiveData []byte, header map[string]interface{}, retryClient kelleyRabbimqPool.RetryClientInterface) bool { //这里执行成功处理
if len(receiveData) == 0 {
logLine("消费数据为空")
return true
}
requestData := &dataRequest{}
//解析数据到对象
dataReader := bytes.NewReader(receiveData)
err := json.NewDecoder(dataReader).Decode(&requestData)
requestData.Header = header
//数据解析失败
if err != nil {
//newError(CODE_RABBITMQ_PARSES_DATA_ERROR, string(receiveData)).Error()
logLineF("node Name parse request data error: %s", d.NodeName, err)
return true
} else {
//这里需要有处理是否重试的逻辑验证
return d.routeEngine.Handle(requestData, retryClient, d)
}
}, func(code int, err error) { //这里执行失败处理
logLineF("nodeName %s, event error, message:%s", d.NodeName, newError(code, err.Error()))
})
}
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )