1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/redkale-redkale

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
agent-message.md 3.5 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
gitlife-traslator Отправлено 29.11.2024 11:56 84b3ad0

Очередь сообщений

MessageAgent — это абстрактный интерфейс центра сообщений.

Конфигурация

<mq name="mymq" type="kafka">
    <servers value="127.0.0.1:9092"/>
    <consumer autoload="true"/>
</mq>

Потребление сообщений

Через @ResourceConsumer и MessageConsumer интерфейсы реализуется потребление.

@ResourceConsumer(mq = "mymq", topics = "test_bean_topic")
public class TestMessageConsumer implements MessageConsumer<TestBean> {

    @Override
    public void init(AnyValue config) {
        System.out.println("Выполнение TestMessageConsumer.init");
    }

    @Override
    public void onMessage(MessageEvent<TestBean>[] events) {
        for (MessageEvent<TestBean> event : events) {
            System.out.println("Потребление сообщения, сообщение: " + event.getMessage());
        }
    }

    @Override
    public void destroy(AnyValue config) {
        System.out.println("Выполнение TestMessageConsumer.destroy");
    }
}

Через метод в Service с пометкой @Messaged реализуется потребление, метод может быть только protected или public, не может быть final или static.

public class TestMessageService extends AbstractService {

    @Messaged(mq = "mymq", topics = "test_bean_topic")
    protected void runMessage(MessageEvent<TestBean>[] events) {
        for (MessageEvent<TestBean> event : events) {
            System.out.println("Потребление сообщения, сообщение: " + event.getMessage());
        }
    }
}

Через метод в @Component Service с пометкой @Messaged реализуется потребление, метод должен быть public.

@Component
public final class TestMessageService extends AbstractService {

    @Messaged(mq = "mymq", topics = "test_bean_topic")
    public int runMessage(MessageEvent<TestBean>[] events) {
        for (MessageEvent<TestBean> event : events) {
            System.out.println("Потребление сообщения, сообщение: " + event.getMessage());
        }
        return 0;
    }
}

Создание сообщений

Через метод в @Component Service с пометкой @Messaged реализуется создание, метод должен быть public.

@Component
public class TestMessageService extends AbstractService {

    @ResourceProducer(mq = "mymq")
    private MessageProducer producer;

    public void sendMessage() {
        TestBean bean = new TestBean(12345, "this is a message");
        System.out.println("Создание сообщения: " + bean);
        producer.sendMessage("test_bean_topic", bean);
    }
}

Управление темами

Через MessageManager осуществляется управление темами.

public class TestMessageManager extends AbstractService {

    @Resource(name = "mymq")
    private MessageManager manager;

    // Создание темы
    public void initTopic() {
        manager.createTopic("topic_1", "topic_2").join();
    }

    // Удаление темы
    public void deleteTopic() {
        manager.deleteTopic("topic_1", "topic_2").join();
    }

    // Запрос темы
    public void printTopic() {
        List<String> topics = manager.queryTopic().join();
    }
}

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/redkale-redkale.git
git@api.gitlife.ru:oschina-mirror/redkale-redkale.git
oschina-mirror
redkale-redkale
redkale-redkale
main