Artfii-MQ (AMQ) — это легковесная, зависимая от небольшого количества ресурсов система обмена сообщениями.
Архитектура AMQ
Основные моменты разработки AMQ
public class AioServer<T> implements Runnable {
public void start() throws IOException {
if (config.isBannerEnabled()) {
LOGGER.info(config.BANNER + "\r\n :: amq-socket ::\t(" + config.VERSION + ")");
}
start0((AsynchronousSocketChannel channel)->new AioPipe<T>(channel, config,config.isSsl(),config.isServer()));
}
}
public class AioClient<T> implements Runnable {
/**
* 启动客户端。
* <p>
* 在与服务端建立连接期间,该方法处于阻塞状态。直至连接建立成功,或者发生异常。
* </p>
* <p>
* 该start方法支持外部指定AsynchronousChannelGroup,实现多个客户端共享一组线程池资源,有效提升资源利用率。
* </p>
*
* @param asynchronousChannelGroup IO事件处理线程组
*/
public AioPipe<T> start(AsynchronousChannelGroup asynchronousChannelGroup) throws IOException, ExecutionException, InterruptedException {
this.asynchronousChannelGroup = asynchronousChannelGroup;
this.socketChannel = AsynchronousSocketChannel.open(asynchronousChannelGroup);
//set socket options
if (config.getSocketOptions() != null) {
for (SocketOption option : config.getSocketOptions()) {
socketChannel.setOption(option, option.type());
}
} else {
setDefSocketOptions(socketChannel);
}
//bind host
socketChannel.connect(new InetSocketAddress(config.getHost(), config.getPort())).get();
//连接成功则构造 AIO-PIPE 对象
pipe = new AioPipe<>(socketChannel, config);
pipe.setAioClient(this);
pipe.startRead();
// 启动断链重连TASK
ClientReconectTask.start(pipe, config.getBreakReconnectMs());
logger.warn("amq-socket client started on {} {}, pipeId:{}", config.getHost(), config.getPort(),pipe.getId());
return pipe;
}
}
public class AioMqServer extends AioServer {
public void start() {
AioServer<ByteBuffer> aioServer = new AioServer(MqConfig.inst.host, MqConfig.inst.port, new AioProtocol(), new MqServerProcessor());
}
}
/**
* Func : Mq 消息处理
*
* @author: leeton on 2019/2/25.
*/
public class MqServerProcessor extends AioBaseProcessor<BaseMessage> {
private static Logger logger = LoggerFactory.getLogger(MqServerProcessor.class);
private void directSend(AioPipe pipe, Message message) {
ProcessorImpl.INST.onMessage(pipe, message);
}
}
public enum ProcessorImpl implements Processor{
public void onMessage(AioPipe pipe, Message message) {
if (!shutdowNow && null != message) {
if (MqConfig.inst.start_store_all_message_to_db) { // 持久化所有消息
if (!message.subscribeTF()) {
tiggerStoreAllMsgToDb(persistent_worker, message);
}
}
String msgId = message.getK().getId();
if (message.ackMsgTF()) { // ACK 消息
incrAck();
if (Message.Life.SPARK == message.getLife()) {
removeSubscribeCacheOnAck(msgId);
removeDbDataOfDone(msgId);
} else {
Integer clientNode = getNode(pipe);
upStatOfACK(clientNode,
``` Данный текст написан на языке Java.
Вот перевод текста на русский язык:
message); } } else { if (message.subscribeTF()) { // subscribe msg addSubscribeIF(pipe, message); if (isAcceptJob(message)) { // 如果工作任务已经先一步发布了,则触发-->直接把任务发给订阅者 incrAccpetJob(); triggerDirectSendJobToAcceptor(pipe, message); }else { incrCommonSubscribe(); } // return;
} else {
if (isPublishJob(message)) { // 发布的消息为工作任务(pingpong)
incrPublishJob();
cachePubliceJobMessage(msgId, message);
buildSubscribeWaitingJobResult(pipe, message);
Subscribe acceptor = getSubscribe(message.getK().getTopic());
if (null != acceptor) { // 本任务已经有订阅者
sendMessageToSubcribe(message,acceptor);
return;
}
}else {
incrCommonPublish();
if (Message.Life.SPARK != message.getLife()) {
cacheCommonPublishMessage(msgId, message);
}
}
// 发布消息
publishJobToWorker(message);
}
}
}
}
}
# Использование AMQ (пример SPRING-BOOT)
1. Создайте сервер и запустите его в первую очередь:
```java
public class MqStart {
public static void main(String[] args) {
AioMqServer.instance.start();
}
}
@Component
public class AmqClient extends MqClientProcessor {
public AmqClient() {
try {
final int threadSize = MqConfig.inst.client_connect_thread_pool_size;
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(threadSize, (r)->new Thread(r));
AioMqClient<Message> client = new AioMqClient(new AioProtocol(), this);
client.setBreakReconnectMs(5000); //устанавливаем время цикла разрыва соединения
client.start(channelGroup);
} catch (IOException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* Func : Пример
* Сначала выполните метод acceptjob, то есть сначала отправьте задачу на подписку, а затем выполните sendjob
*
* @author: leeton on 2019/4/1.
*/
@RestController
public class TestController {
@Resource
private AmqClient amqClient;
@RequestMapping("/")
public String hello(){
return "Are u ok?";
}
/**
* Подписчик
* Получает JOB, выполняет его и отправляет результат издателю JOB
* @return
*/
@RequestMapping("/acceptjob")
public String rec(){
TestUser user = new TestUser(2, "alice");
String jobTopc = "topic_get_userById";
amqClient.acceptJob(jobTopc, (Message job)->{
if (job != null) {
System.err.println("accept a job: " +job);
// Выполняем задание JOB
if (user.getId().equals(job.getV())) {
amqClient.<TestUser>finishJob(jobTopc, user);
}
}
});
return "ok";
}
/**
* Издатель
* Публикует рабочую задачу и получает результаты выполнения
* @return
*/
@RequestMapping("/sendjob")
public Map send(){
Map<String, Object> result = new HashMap<>();
Message message = amqClient.publishJob("topic_get_userById",2);
result.put("sendjob", "topic_get_userById");
result.put("result", message);
return result;
}
}
RingBuffer выглядит как кольцевая структура, но на самом деле это последовательный массив. Смотрите код:
// Левый блок данных
public class LhsPadding
{
protected long p1, p2, p3, p4, p5, p6, p7;
}
// Правый блок данных
public class RhsPadding extends Value
{
protected long p9, p10, p11, p12, p13, p14, p15;
}
// Наконец, объединяем их в структуру последовательности Sequence, которая использует CAS для записи данных
public class Sequence extends RhsPadding {
``` ```
public boolean compareAndSet(final long expectedValue, final long newValue) {
return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);
}
// Окончательная передача RingBuffer классу для упаковки Sequence и выполнения операций чтения и записи:
public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;
protected long p1, p2, p3, p4, p5, p6, p7;
/**
* Создание RingBuffer с полным набором опций.
*/
RingBuffer(EventFactory<E> eventFactory, Sequencer sequencer) {
super(eventFactory, sequencer);
}
public E get(long sequence) {
return elementAt(sequence);
}
public void publishEvent(EventTranslator<E> translator) {
final long sequence = sequencer.next();
translateAndPublish(translator, sequence);
}
}```
[Схема disruptor 3.0](img/distuptor3.0.png)
[Пример использования Disruptor и Ringbuffer](https://www.cnblogs.com/haiq/p/4112689.html)
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )