1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/artlongs-amq

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
amq-helper.md 14 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
gitlife-traslator Отправлено 29.11.2024 03:21 d0d74f6

Artfii-MQ (AMQ) — это легковесная, зависимая от небольшого количества ресурсов система обмена сообщениями.

  1. Система обеспечивает высокую производительность благодаря использованию технологии LMAX-Ringbuffer для распределения сообщений.
  2. По умолчанию срок жизни опубликованных сообщений составляет один день, что позволяет избежать накопления большого количества просроченных и недействительных сообщений, как в RocketMQ.
  3. После того как несколько потребителей подписываются на сообщение, оно автоматически удаляется (ALL-ACK). Также можно настроить пользовательский срок жизни сообщения (постоянный, ALL-ACK или «флеш»).
  4. Все сообщения по умолчанию автоматически помечаются как «доставленные» после успешного получения.
  5. Если клиент не подтверждает получение сообщения (NACK), можно установить количество повторных попыток и интервал между ними.
  6. В случае сбоя отправки (отключение потребителя) можно также настроить количество повторных попыток и их интервал.
  7. Особенностью AMQ является режим PING/PONG, который поддерживает функции, подобные RPC, но без недостатков, связанных с принудительным сопряжением RPC. Кроме того, этот режим работает эффективнее, чем вызовы RPC (идеальный партнёр для микросервисов).
  8. Обычный режим (издатель/подписчик) поддерживает массовую и высокопараллельную запись журналов ввода-вывода.
  9. AMQ имеет встроенные функции мониторинга трафика и управления через бэкенд.
  10. Чёрный список автоматически отклоняет подключения.
  11. Только после завершения проекта я обнаружил, что существует протокол MQTT (протокол для интернета вещей), и мой проект стал его примером.

Архитектура AMQ

  1. Обработка данных ввода-вывода через AIO.
  2. Преобразование данных ввода-вывода в объекты сообщений с помощью обработчика протокола.
  3. Распределение и обработка задач сообщений через RingBuffer.
  4. Централизованное сохранение сообщений.

Основные моменты разработки AMQ

  • Запуск AsynchronousServerSocketChannel для приёма сообщений от клиентов и создания AioPipe:
public class AioServer<T> implements Runnable {
    public void start() throws IOException {
          if (config.isBannerEnabled()) {
              LOGGER.info(config.BANNER + "\r\n :: amq-socket ::\t(" + config.VERSION + ")");
          }
          start0((AsynchronousSocketChannel channel)->new AioPipe<T>(channel, config,config.isSsl(),config.isServer()));
      }
}
  • Создание AioClient и возврат pipe:

public class AioClient<T> implements Runnable {
    
    /**
     * 启动客户端。
     * <p>
     * 在与服务端建立连接期间,该方法处于阻塞状态。直至连接建立成功,或者发生异常。
     * </p>
     * <p>
     * 该start方法支持外部指定AsynchronousChannelGroup,实现多个客户端共享一组线程池资源,有效提升资源利用率。
     * </p>
     *
     * @param asynchronousChannelGroup IO事件处理线程组
     */
    public AioPipe<T> start(AsynchronousChannelGroup asynchronousChannelGroup) throws IOException, ExecutionException, InterruptedException {
        this.asynchronousChannelGroup = asynchronousChannelGroup;
        this.socketChannel = AsynchronousSocketChannel.open(asynchronousChannelGroup);
        //set socket options
        if (config.getSocketOptions() != null) {
            for (SocketOption option : config.getSocketOptions()) {
                socketChannel.setOption(option, option.type());
            }
        } else {
            setDefSocketOptions(socketChannel);
        }
        //bind host
        socketChannel.connect(new InetSocketAddress(config.getHost(), config.getPort())).get();
        //连接成功则构造 AIO-PIPE 对象
        pipe = new AioPipe<>(socketChannel, config);
        pipe.setAioClient(this);
        pipe.startRead();
        // 启动断链重连TASK
        ClientReconectTask.start(pipe, config.getBreakReconnectMs());
        logger.warn("amq-socket client started on {} {}, pipeId:{}", config.getHost(), config.getPort(),pipe.getId());
        return pipe;
    }
}
  • Создание AioMqServer и запуск с загрузкой обработчиков сообщений (MqServerProcessor) и протокола (AioProtocol):
public class AioMqServer extends AioServer {
    public void start() {
            AioServer<ByteBuffer> aioServer = new AioServer(MqConfig.inst.host, MqConfig.inst.port, new AioProtocol(), new MqServerProcessor());
    }
}
  • Декодирование потока сообщений и отправка в обработчик сообщений:
/**
 * Func : Mq 消息处理
 *
 * @author: leeton on 2019/2/25.
 */
public class MqServerProcessor extends AioBaseProcessor<BaseMessage> {
    private static Logger logger = LoggerFactory.getLogger(MqServerProcessor.class);

    private void directSend(AioPipe pipe, Message message) {
        ProcessorImpl.INST.onMessage(pipe, message);
    }

}
  • Обработчик сообщений для дальнейшей обработки сообщений (публикация, подписка, сохранение...):
public enum ProcessorImpl implements Processor{

    public void onMessage(AioPipe pipe, Message message) {
        if (!shutdowNow && null != message) {
            if (MqConfig.inst.start_store_all_message_to_db) { // 持久化所有消息
                if (!message.subscribeTF()) {
                    tiggerStoreAllMsgToDb(persistent_worker, message);
                }
            }
            String msgId = message.getK().getId();
            if (message.ackMsgTF()) { // ACK 消息
                incrAck();
                if (Message.Life.SPARK == message.getLife()) {
                    removeSubscribeCacheOnAck(msgId);
                    removeDbDataOfDone(msgId);
                } else {
                    Integer clientNode = getNode(pipe);
                    upStatOfACK(clientNode,
``` Данный текст написан на языке Java.

Вот перевод текста на русский язык:

message); } } else { if (message.subscribeTF()) { // subscribe msg addSubscribeIF(pipe, message); if (isAcceptJob(message)) { // 如果工作任务已经先一步发布了,则触发-->直接把任务发给订阅者 incrAccpetJob(); triggerDirectSendJobToAcceptor(pipe, message); }else { incrCommonSubscribe(); } // return;

            } else {
                if (isPublishJob(message)) { // 发布的消息为工作任务(pingpong)
                    incrPublishJob();
                    cachePubliceJobMessage(msgId, message);
                    buildSubscribeWaitingJobResult(pipe, message);
                    Subscribe acceptor = getSubscribe(message.getK().getTopic());
                    if (null != acceptor) { // 本任务已经有订阅者
                        sendMessageToSubcribe(message,acceptor);
                        return;
                    }
                }else {
                    incrCommonPublish();
                    if (Message.Life.SPARK != message.getLife()) {
                        cacheCommonPublishMessage(msgId, message);
                    }
                }
                // 发布消息
                publishJobToWorker(message);
            }
        }
    } 
}

}


# Использование AMQ (пример SPRING-BOOT)

1. Создайте сервер и запустите его в первую очередь:

```java
public class MqStart {
    public static void main(String[] args) {
        AioMqServer.instance.start();
    }
}
  1. Создайте клиент:
@Component
public class AmqClient extends MqClientProcessor {

    public AmqClient() {
        try {
            final int threadSize = MqConfig.inst.client_connect_thread_pool_size;
            AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(threadSize, (r)->new Thread(r));
            AioMqClient<Message> client = new AioMqClient(new AioProtocol(), this);
            client.setBreakReconnectMs(5000); //устанавливаем время цикла разрыва соединения
            client.start(channelGroup);

        } catch (IOException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}
  1. Отправьте или получите сообщение:
/**
 * Func : Пример
 * Сначала выполните метод acceptjob, то есть сначала отправьте задачу на подписку, а затем выполните sendjob
 *
 * @author: leeton on 2019/4/1.
 */
@RestController
public class TestController {

    @Resource
    private AmqClient amqClient;

    @RequestMapping("/")
    public String hello(){
        return "Are u ok?";
    }

    /**
     * Подписчик
     * Получает JOB, выполняет его и отправляет результат издателю JOB
     * @return
     */
    @RequestMapping("/acceptjob")
    public String rec(){
        TestUser user = new TestUser(2, "alice");
        String jobTopc = "topic_get_userById";
        amqClient.acceptJob(jobTopc, (Message job)->{
            if (job != null) {
                System.err.println("accept a job: " +job);
                // Выполняем задание JOB
                if (user.getId().equals(job.getV())) {
                    amqClient.<TestUser>finishJob(jobTopc, user);
                }
            }
        });
        return "ok";
    }

    /**
     * Издатель
     * Публикует рабочую задачу и получает результаты выполнения
     * @return
     */
    @RequestMapping("/sendjob")
    public Map send(){
        Map<String, Object> result = new HashMap<>();
        Message message = amqClient.publishJob("topic_get_userById",2);
        result.put("sendjob", "topic_get_userById");
        result.put("result", message);
        return result;
    }

}

Введение в архитектуру Disruptor и технологию Ringbuffer (версия 3.0)

RingBuffer выглядит как кольцевая структура, но на самом деле это последовательный массив. Смотрите код:

 // Левый блок данных
 public class LhsPadding
 {
     protected long p1, p2, p3, p4, p5, p6, p7;
 }
 // Правый блок данных
 public class RhsPadding extends Value
 {
     protected long p9, p10, p11, p12, p13, p14, p15;
 }
 // Наконец, объединяем их в структуру последовательности Sequence, которая использует CAS для записи данных
 public class Sequence extends RhsPadding {
``` ```
public boolean compareAndSet(final long expectedValue, final long newValue) {
    return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);
}

// Окончательная передача RingBuffer классу для упаковки Sequence и выполнения операций чтения и записи:

    public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;
    protected long p1, p2, p3, p4, p5, p6, p7;

    /**
     * Создание RingBuffer с полным набором опций.
     */
    RingBuffer(EventFactory<E> eventFactory, Sequencer sequencer) {
        super(eventFactory, sequencer);
    }

    public E get(long sequence) {
        return elementAt(sequence);
    }

    public void publishEvent(EventTranslator<E> translator) {
        final long sequence = sequencer.next();
        translateAndPublish(translator, sequence);
    }
}```

[Схема disruptor 3.0](img/distuptor3.0.png)

[Пример использования Disruptor и Ringbuffer](https://www.cnblogs.com/haiq/p/4112689.html)

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/artlongs-amq.git
git@api.gitlife.ru:oschina-mirror/artlongs-amq.git
oschina-mirror
artlongs-amq
artlongs-amq
master