MessageAgent — это абстрактный интерфейс центра сообщений.
<mq name="mymq" type="kafka">
<servers value="127.0.0.1:9092"/>
<consumer autoload="true"/>
</mq>
Через @ResourceConsumer
и MessageConsumer
интерфейсы реализуется потребление.
@ResourceConsumer(mq = "mymq", topics = "test_bean_topic")
public class TestMessageConsumer implements MessageConsumer<TestBean> {
@Override
public void init(AnyValue config) {
System.out.println("Выполнение TestMessageConsumer.init");
}
@Override
public void onMessage(MessageEvent<TestBean>[] events) {
for (MessageEvent<TestBean> event : events) {
System.out.println("Потребление сообщения, сообщение: " + event.getMessage());
}
}
@Override
public void destroy(AnyValue config) {
System.out.println("Выполнение TestMessageConsumer.destroy");
}
}
Через метод в Service с пометкой @Messaged
реализуется потребление, метод может быть только protected
или public
, не может быть final
или static
.
public class TestMessageService extends AbstractService {
@Messaged(mq = "mymq", topics = "test_bean_topic")
protected void runMessage(MessageEvent<TestBean>[] events) {
for (MessageEvent<TestBean> event : events) {
System.out.println("Потребление сообщения, сообщение: " + event.getMessage());
}
}
}
Через метод в @Component
Service с пометкой @Messaged
реализуется потребление, метод должен быть public
.
@Component
public final class TestMessageService extends AbstractService {
@Messaged(mq = "mymq", topics = "test_bean_topic")
public int runMessage(MessageEvent<TestBean>[] events) {
for (MessageEvent<TestBean> event : events) {
System.out.println("Потребление сообщения, сообщение: " + event.getMessage());
}
return 0;
}
}
Через метод в @Component
Service с пометкой @Messaged
реализуется создание, метод должен быть public
.
@Component
public class TestMessageService extends AbstractService {
@ResourceProducer(mq = "mymq")
private MessageProducer producer;
public void sendMessage() {
TestBean bean = new TestBean(12345, "this is a message");
System.out.println("Создание сообщения: " + bean);
producer.sendMessage("test_bean_topic", bean);
}
}
Через MessageManager
осуществляется управление темами.
public class TestMessageManager extends AbstractService {
@Resource(name = "mymq")
private MessageManager manager;
// Создание темы
public void initTopic() {
manager.createTopic("topic_1", "topic_2").join();
}
// Удаление темы
public void deleteTopic() {
manager.deleteTopic("topic_1", "topic_2").join();
}
// Запрос темы
public void printTopic() {
List<String> topics = manager.queryTopic().join();
}
}
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )