Spring Integration 5 – Message Channels, Introduction and Common Properties

This article is an article in a series of articles introducing Spring Integration 5. I imagine most of the examples can be applied to earlier versions of Spring Integration as well, it just happens that I started out with Spring Integration 5 and I have little reason to investigate compatibility with previous versions.

In this article I will be introducing Spring Integration message channels and examine the common properties found in all message channels.

Examples

All the examples in this series of articles are taken from my Spring Integration Scrutinized repository that is available on GitHub.

Types of Message Channels

Message channels in Spring Integration can be divided into two main categories:

  • Subscribable message channels
  • Pollable message channels

Subscribable message channels are message channels to which one or more consumers register themselves, subscribe, to receive the messages posted onto the message channel. A message posted to the message channel is only delivered to one of the clients that have subscribed to the message channel prior to the message being posted to the message channel.

Subscribable message channel with three consumers.

Consumer 3 consumes the message posted to the message channel by the producer.

Pollable message channels store the messages posted to the message channel until a consumer comes and requests a message from the message channel. Consumers does not subscribe to a pollable message channel.

Pollable message channel to which a producer posts a message. A consumer polls the message channel at some later point in time and consumes the message posted to the message channel earlier.

In each of these two message channel categories there are several different types of message channels, as we will see later. First we’ll have a look at the similarities.

Common Properties of Message Channels

All Spring Integration message channels implement the MessageChannel interface found in the package org.springframework.messaging . The inheritance hierarchy of MessageChannel can be seen in the figure below.

Spring Integration 5 message channels class hierarchy.

Note the PollableChannel and SubscribableChannel interfaces being the roots of the two main message channel categories. In addition, also note the FluxMessageChannel class, which is a concrete message channel implementation which does not implement PollableChannel nor a SubscribableChannel .

The MessageChannel interface only contains two send methods with the following signatures:

  • boolean send(Message message)
    Sends a message to the message channel with an indefinite timeout and return true if sending the message was successful.
  • boolean send(Message message, long timeout)
    Sends a message to the message channel with a specified timeout.

Sending one or more messages is done in almost all the examples related to message channels and no special examples will be given.

The AbstractMessageChannel class is the class from which all the different types of message channels inherit. It implements support for the following features:

  • Retaining a history of messages sent through the message channel (tracking).
  • Message channel statistics.
  • Logging.
    Outputs log messages during the different stages of sending a message to the message channel if logging is enabled and the log level is set to debug or lower.
  • Configuring one or more message datatypes accepted by the message channel.
    In addition, a message converter can be configured on message channels which will attempt to convert any message of other datatype sent to the message channel to a datatype accepted by the message channel.
  • Interceptors.

Message History

This example from the MessageChannelsCommonTests class shows how to enable and retrieve message history for a message channel.

    /**
     * Tests enabling message history for a message channel.
     * Note that in this test, message history is only enabled for one single message channel.
     * To enable message history for all message channels, use the {@code @EnableMessageHistory}
     * annotation on a configuration class.
     *
     * Expected result: One message history entry should be generated for the message sent
     * and it should contain the name of the message channel to which the message was sent.
     *
     * @throws Exception If an error occurs. Indicates test failure.
     */
    @Test
    public void messageHistoryTest() throws Exception {
        final AbstractMessageChannel theMessageChannel;
        final Message theInputMessage;
        final List theSubscriberReceivedMessages =
            new CopyOnWriteArrayList();
 
        /*
         * Create the message channel and enable message history for the individual message channel.
         */
        theMessageChannel = new DirectChannel();
        theMessageChannel.setComponentName(DIRECT_CHANNEL_NAME);
        theMessageChannel.setShouldTrack(true);
 
        /*
         * Create a subscriber (message handler) that adds each received message
         * to a list. Register the subscriber with the subscribable message channel.
         */
        final MessageHandler theSubscriber = theSubscriberReceivedMessages::add;
        ((DirectChannel)theMessageChannel).subscribe(theSubscriber);
 
        /* Send a message to the channel. */
        theInputMessage = MessageBuilder.withPayload(GREETING_STRING).build();
        theMessageChannel.send(theInputMessage);
 
        await().atMost(2, TimeUnit.SECONDS).until(() ->
            theSubscriberReceivedMessages.size() > 0);
 
        final Message theFirstReceivedMessage = theSubscriberReceivedMessages.get(0);
        final MessageHistory theFirstReceivedMessageHistory =
            MessageHistory.read(theFirstReceivedMessage);
        final Properties theMessageHistoryEntry = theFirstReceivedMessageHistory.get(0);
 
        LOGGER.info("Message history object: " + theFirstReceivedMessageHistory);
        LOGGER.info("Message history entry: " + theMessageHistoryEntry);
 
        Assert.assertEquals("Message history entry should be for our message channel",
            DIRECT_CHANNEL_NAME, theMessageHistoryEntry.getProperty("name"));
    }

Note that:

  • The DirectChannel message channel used in this example is named using the setComponentName method.
    This is the name that will appear in the message history.
  • Message history is enabled for the individual message channel by calling the setShouldTrack method with the parameter true.
  • One single message is sent to the message channel.
    Thus we can expect one single message history entry to be generated as a result.
  • The message history object and a message history entry are logged.
    This will produce console log that will be examined below.
  • The name property of the message history entry is asserted to be equal to the message channel name.

If the test is run, the two following lines can be seen in the console log:

2017-12-25 19:20:23.006  INFO 1158 --- [           main] s.i.s.m.MessageChannelsCommonTests       : Message history object: MyDirectChannel
2017-12-25 19:20:23.006  INFO 1158 --- [           main] s.i.s.m.MessageChannelsCommonTests       : Message history entry: {name=MyDirectChannel, type=channel, timestamp=1514226022884}

From this one can conclude that the message history object contains the message history for an object named MyDirectChannel, which is the name of the direct message channel created in the test. If there are multiple message history entries in the message history object, the string representation will be a comma-separated list of the name properties from the entries.

The subsequent log-line shows that one message history entry contains three properties:

  • Name
    The name is the name of the component that logged the message history entry.
  • Type
    The type is the type of the component that logged the message history entry.
  • Timestamp
    The timestamp is the time at which the message history entry was logged.

Message Channel Statistics

There are two alternatives as far as message channel statistics are concerned; full statistics and counts only. The former includes not only gathering of counts and timing information but will also calculate mean values. The counts-only option will, as the name implies, only maintain counts of, for instance, how many messages that have passed through the message channel.

Full Message Channel Statistics

From the MessageChannelsCommonTests class comes this example showing how to enable and retrieve full statistics for a message channel:

    /**
     * Tests gathering of full statistics for a message channel.
     *
     * Expected result: Full statistics, including message counts and a calculated
     * mean values, should be maintained by the message channel.
     */
    @Test
    public void gatherMessageChannelFullStatisticsTest() {
        final AbstractMessageChannel theMessageChannel;
        final DefaultMessageChannelMetrics theMessageChannelMetrics;
        final List theSubscriberReceivedMessages =
            new CopyOnWriteArrayList();
 
        /* Create the message channel. */
        theMessageChannel = new DirectChannel();
        theMessageChannel.setComponentName(DIRECT_CHANNEL_NAME);
 
        /*
         * Create the object responsible for gathering statistics for the message channel.
         * It is not necessary to create and set a message channel metrics object on a
         * message channel, since one will be created when the message channel is created.
         * However, it is not possible to retrieve the default message channel metrics
         * object.
         */
        theMessageChannelMetrics = new DefaultMessageChannelMetrics("DirectChannelMetrics");
        theMessageChannel.configureMetrics(theMessageChannelMetrics);
 
        /*
         * Enable gathering of full statistics for the message channel.
         * If using a custom message channel metrics object like done in this example,
         * the message channel metrics object has to be set on the message channel prior
         * to enabling statistics.
         * To enable only simple metrics, i.e. counts, on a message channel, use the
         * {@code setCountsEnabled} method instead.
         */
        theMessageChannel.setStatsEnabled(true);
 
        /*
         * Create a subscriber (message handler) that adds each received message
         * to a list. Register the subscriber with the subscribable message channel.
         */
        final MessageHandler theSubscriber = theSubscriberReceivedMessages::add;
        ((DirectChannel)theMessageChannel).subscribe(theSubscriber);
 
        sendSomeMessagesToMessageChannelWithRandomDelay(theMessageChannel);
 
 
        await().atMost(2, TimeUnit.SECONDS).until(() ->
            theSubscriberReceivedMessages.size() >= METRICSTEST_MESSAGE_COUNT);
 
        /*
         * Check for some metrics from the message channel.
         * With simple, non-full, metrics only counts, for instance, the
         * number of messages sent will be maintained.
         * With full statistics, additional metrics will also be maintained, such as
         * duration and mean duration of send operations on the message channel.
         */
        Assert.assertEquals("Metrics number of messages sent should match",
            METRICSTEST_MESSAGE_COUNT, theMessageChannelMetrics.getSendCount());
        Assert.assertTrue("Metrics mean send duration should be greater than zero",
            theMessageChannelMetrics.getMeanSendDuration() > 0);
 
        /* Retrieve some metrics from the message channel metrics object. */
        LOGGER.info("*** Metrics from the message channel metrics object:");
        LOGGER.info("Message channel metrics object: " + theMessageChannelMetrics);
        LOGGER.info("Send duration: " + theMessageChannelMetrics.getSendDuration());
        LOGGER.info("Error rate: " + theMessageChannelMetrics.getErrorRate());
        LOGGER.info("Send rate: " + theMessageChannelMetrics.getSendRate());
 
        /* Retrieve some metrics from the message channel itself. */
        LOGGER.info("*** Metrics from the message channel:");
        LOGGER.info("Send duration: " + theMessageChannel.getSendDuration());
        LOGGER.info("Error rate: " + theMessageChannel.getErrorRate());
        LOGGER.info("Send rate: " + theMessageChannel.getSendRate());
        LOGGER.info("Mean send rate: " + theMessageChannel.getMeanSendRate());
        LOGGER.info("Mean error rate: " + theMessageChannel.getMeanErrorRate());
        LOGGER.info("Mean error ratio: " + theMessageChannel.getMeanErrorRatio());
        LOGGER.info("Min send duration: " + theMessageChannel.getMinSendDuration());
        LOGGER.info("Mean send duration: " + theMessageChannel.getMeanSendDuration());
        LOGGER.info("Max send duration: " + theMessageChannel.getMaxSendDuration());
        LOGGER.info("Send count: " + theMessageChannel.getSendCount());
        LOGGER.info("Error count: " + theMessageChannel.getSendErrorCount());
    }

Note that:

  • After the message channel has been created, a DefaultMessageChannelMetrics object is created and set on the message channel using the configureMetrics method.
  • Having set the channel metrics object on the message channel, statistics is enabled on the message channel using the method setStatsEnabled .
    It is necessary to set the channel metrics object before enabling statistics, otherwise the channel metrics object will not be configured properly.
  • A number of messages are sent to the message channel.
    There is a random delay between each message to make the statistics more realistic.
  • Some simple assertions are made on the message channel metrics object.
  • Metrics are retrieved from the message channel metrics object and logged to the console.
    This is one, probably the less common, way metrics for a message channel can be retrieved.
  • Metrics are retrieved from the message channel itself and logged to the console.
    Message channels have a number of delegate methods which will retrieve metrics from the message channel metrics object, so it is not necessary to keep a reference to this object.

When the test is run, the following lines may be observed in the console:

2017-12-27 13:02:20.383  INFO 1017 --- [           main] s.i.s.m.MessageChannelsCommonTests       : *** Metrics from the message channel metrics object:
2017-12-27 13:02:20.384  INFO 1017 --- [           main] s.i.s.m.MessageChannelsCommonTests       : Message channel metrics object: MessageChannelMonitor: [name=DirectChannelMetrics, sends=40]
2017-12-27 13:02:20.384  INFO 1017 --- [           main] s.i.s.m.MessageChannelsCommonTests       : Send duration: [N=40, min=0.002706, max=0.244026, mean=0.018557, sigma=0.014861]
2017-12-27 13:02:20.385  INFO 1017 --- [           main] s.i.s.m.MessageChannelsCommonTests       : Error rate: [N=0, min=0.000000, max=0.000000, mean=0.000000, sigma=0.000000]
2017-12-27 13:02:20.385  INFO 1017 --- [           main] s.i.s.m.MessageChannelsCommonTests       : Send rate: [N=40, min=0.000024, max=0.095514, mean=20.336082, sigma=1.257225]
2017-12-27 13:02:20.385  INFO 1017 --- [           main] s.i.s.m.MessageChannelsCommonTests       : *** Metrics from the message channel:
2017-12-27 13:02:20.386  INFO 1017 --- [           main] s.i.s.m.MessageChannelsCommonTests       : Send duration: [N=40, min=0.002706, max=0.244026, mean=0.018557, sigma=0.014861]
2017-12-27 13:02:20.386  INFO 1017 --- [           main] s.i.s.m.MessageChannelsCommonTests       : Error rate: [N=0, min=0.000000, max=0.000000, mean=0.000000, sigma=0.000000]
2017-12-27 13:02:20.386  INFO 1017 --- [           main] s.i.s.m.MessageChannelsCommonTests       : Send rate: [N=40, min=0.000024, max=0.095514, mean=20.325428, sigma=1.257225]
2017-12-27 13:02:20.386  INFO 1017 --- [           main] s.i.s.m.MessageChannelsCommonTests       : Mean send rate: 20.32185756779332
2017-12-27 13:02:20.387  INFO 1017 --- [           main] s.i.s.m.MessageChannelsCommonTests       : Mean error rate: 0.0
2017-12-27 13:02:20.387  INFO 1017 --- [           main] s.i.s.m.MessageChannelsCommonTests       : Mean error ratio: 0.0
2017-12-27 13:02:20.387  INFO 1017 --- [           main] s.i.s.m.MessageChannelsCommonTests       : Min send duration: 0.002706
2017-12-27 13:02:20.387  INFO 1017 --- [           main] s.i.s.m.MessageChannelsCommonTests       : Mean send duration: 0.018557174643781548
2017-12-27 13:02:20.387  INFO 1017 --- [           main] s.i.s.m.MessageChannelsCommonTests       : Max send duration: 0.244026
2017-12-27 13:02:20.387  INFO 1017 --- [           main] s.i.s.m.MessageChannelsCommonTests       : Send count: 40
2017-12-27 13:02:20.388  INFO 1017 --- [           main] s.i.s.m.MessageChannelsCommonTests       : Error count: 0

We can see that statistics is indeed gathered and, as far as the mean values, calculated. It will come as no surprise to see that the same information is retrieved from both the message channel metrics object and the message channel itself.

Simple Message Channel Statistics

From the MessageChannelsCommonTests class comes this example showing how to enable and retrieve simple statistics for a message channel:

    /**
     * Tests gathering of simple statistics for a message channel.
     *
     * Expected result: Only message counts will be maintained for the message channel
     */
    @Test
    public void gatherMessageChannelSimpleStatisticsTest() {
        final AbstractMessageChannel theMessageChannel;
        final List theSubscriberReceivedMessages =
            new CopyOnWriteArrayList();
 
        /* Create the message channel. */
        theMessageChannel = new DirectChannel();
        theMessageChannel.setComponentName(DIRECT_CHANNEL_NAME);
 
        /*
         * Enable simple statistics for the message channel.
         * Note that no message channel metrics object is configured on the message
         * channel.
         */
        theMessageChannel.setCountsEnabled(true);
 
        /*
         * Create a subscriber (message handler) that adds each received message
         * to a list. Register the subscriber with the subscribable message channel.
         */
        final MessageHandler theSubscriber = theSubscriberReceivedMessages::add;
        ((DirectChannel)theMessageChannel).subscribe(theSubscriber);
 
        sendSomeMessagesToMessageChannelWithRandomDelay(theMessageChannel);
 
        await().atMost(2, TimeUnit.SECONDS).until(() ->
            theSubscriberReceivedMessages.size() >= METRICSTEST_MESSAGE_COUNT);
 
        /*
         * Check metrics from the message channel.
         * With simple metrics only counts, for instance, the
         * number of messages sent, will be maintained.
         */
        Assert.assertEquals("Metrics number of messages sent should match",
            METRICSTEST_MESSAGE_COUNT, theMessageChannel.getSendCount());
 
        /* Retrieve some metrics from the message channel itself. */
        LOGGER.info("*** Metrics from the message channel:");
        LOGGER.info("Send duration: " + theMessageChannel.getSendDuration());
        LOGGER.info("Error rate: " + theMessageChannel.getErrorRate());
        LOGGER.info("Send rate: " + theMessageChannel.getSendRate());
        LOGGER.info("Mean send rate: " + theMessageChannel.getMeanSendRate());
        LOGGER.info("Mean error rate: " + theMessageChannel.getMeanErrorRate());
        LOGGER.info("Mean error ratio: " + theMessageChannel.getMeanErrorRatio());
        LOGGER.info("Min send duration: " + theMessageChannel.getMinSendDuration());
        LOGGER.info("Mean send duration: " + theMessageChannel.getMeanSendDuration());
        LOGGER.info("Max send duration: " + theMessageChannel.getMaxSendDuration());
        LOGGER.info("Send count: " + theMessageChannel.getSendCount());
        // TODO remove this log line when version containing fix for INT-4373 is released.
        LOGGER.info("NOTE! Due to INT-4373 the error count is wrong.");
        LOGGER.info("Error count: " + theMessageChannel.getSendErrorCount());
    }

Note that:

  • No message channel metrics object is neither created nor configured on the message channel.
    Instead we will rely on the default message channel metrics object that is created as part of creating the message channel.
  • Simple statistics are enable on the message channel object using the setCountsEnabled method.
  • The log statements attempting to retrieve metrics directory from the message channel are identical with the ones in the full message channel statistics example above.
  • There is a bug, INT-4373, in Spring Integration 5.0.0 and earlier that will cause the error count to be incremented when simple statistics is enabled despite no errors having occurred.

Running the test, the following lines may be observed in the console:

2017-12-27 13:03:01.838  INFO 1021 --- [           main] s.i.s.m.MessageChannelsCommonTests       : *** Metrics from the message channel:
2017-12-27 13:03:01.838  INFO 1021 --- [           main] s.i.s.m.MessageChannelsCommonTests       : Send duration: [N=0, min=0.000000, max=0.000000, mean=0.000000, sigma=0.000000]
2017-12-27 13:03:01.839  INFO 1021 --- [           main] s.i.s.m.MessageChannelsCommonTests       : Error rate: [N=0, min=0.000000, max=0.000000, mean=0.000000, sigma=0.000000]
2017-12-27 13:03:01.839  INFO 1021 --- [           main] s.i.s.m.MessageChannelsCommonTests       : Send rate: [N=0, min=0.000000, max=0.000000, mean=0.000000, sigma=0.000000]
2017-12-27 13:03:01.839  INFO 1021 --- [           main] s.i.s.m.MessageChannelsCommonTests       : Mean send rate: 0.0
2017-12-27 13:03:01.839  INFO 1021 --- [           main] s.i.s.m.MessageChannelsCommonTests       : Mean error rate: 0.0
2017-12-27 13:03:01.839  INFO 1021 --- [           main] s.i.s.m.MessageChannelsCommonTests       : Mean error ratio: 0.0
2017-12-27 13:03:01.840  INFO 1021 --- [           main] s.i.s.m.MessageChannelsCommonTests       : Min send duration: 0.0
2017-12-27 13:03:01.840  INFO 1021 --- [           main] s.i.s.m.MessageChannelsCommonTests       : Mean send duration: 0.0
2017-12-27 13:03:01.840  INFO 1021 --- [           main] s.i.s.m.MessageChannelsCommonTests       : Max send duration: 0.0
2017-12-27 13:03:01.840  INFO 1021 --- [           main] s.i.s.m.MessageChannelsCommonTests       : Send count: 40
2017-12-27 13:03:01.840  INFO 1021 --- [           main] s.i.s.m.MessageChannelsCommonTests       : Error count: 0

It can be seen that all the rate and duration numbers are now zero. The only number that is not zero is the send count. The error count is also zero, but would have been non-zero if an error would have occurred sending a message to the message channel.

Message Channel Logging

Spring Integration allows for turning on logging from message channels that will log each message being sent to the message channel before and after the send operation. Turning on this type of logging requires two steps; set the log-level for either a specific type of message channel or for all message channels to debug level and enable logging for the particular message channel that is to log when it is created.

In the example project, I have used Logback for logging and the logback-test.xml file is the appropriate place to set the log-level to debug. The following line is to be added inside the element:


The example test, again in from the MessageChannelsCommonTests class, looks like this:

    /**
     * Tests enabling logging for a message channel.
     * Note that logging for either the package org.springframework.integration.channel
     * or for the specific message channel type must be enabled at debug level or lower
     * in order for log to be written.
     *
     * Expected result: Information from the message channel will be logged to the console.
     */
    @Test
    public void loggingTest() {
        final AbstractMessageChannel theMessageChannel;
        final List theSubscriberReceivedMessages =
            new CopyOnWriteArrayList();
 
        /* Create the message channel. */
        theMessageChannel = new DirectChannel();
        theMessageChannel.setComponentName(DIRECT_CHANNEL_NAME);
 
        /* Enable logging for the message channel. */
        theMessageChannel.setLoggingEnabled(true);
 
        /*
         * Create a subscriber (message handler) that adds each received message
         * to a list. Register the subscriber with the subscribable message channel.
         */
        final MessageHandler theSubscriber = theSubscriberReceivedMessages::add;
        ((DirectChannel)theMessageChannel).subscribe(theSubscriber);
 
        sendSomeMessagesToMessageChannelWithRandomDelay(theMessageChannel);
 
        await().atMost(2, TimeUnit.SECONDS).until(() ->
            theSubscriberReceivedMessages.size() >= METRICSTEST_MESSAGE_COUNT);
 
        /* No verification of the log output is made. */
    }

Note that:

  • After the message channel has been created and its name set, logging is enabled using the setLoggingEnabled method.
  • There are no assertions verifying the outcome.
    The result is log output, in this case to the console, as we are about to see.

If the test is run, the following log output can be observed in the console:

2017-12-29 19:49:47.206  INFO 2253 --- [           main] s.i.s.m.MessageChannelsCommonTests       : Started MessageChannelsCommonTests in 2.023 seconds (JVM running for 3.209)
2017-12-29 19:49:47.436  INFO 2253 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'MyDirectChannel' has 1 subscriber(s).
2017-12-29 19:49:47.440 DEBUG 2253 --- [           main] o.s.integration.channel.DirectChannel    : preSend on channel '[email protected]', message: GenericMessage [payload=0, headers={id=e6e64419-0610-de25-e3d0-7dee6808f8ea, timestamp=1514573387440}]
2017-12-29 19:49:47.441 DEBUG 2253 --- [           main] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel '[email protected]', message: GenericMessage [payload=0, headers={id=e6e64419-0610-de25-e3d0-7dee6808f8ea, timestamp=1514573387440}]
...
2017-12-29 19:49:49.522 DEBUG 2253 --- [           main] o.s.integration.channel.DirectChannel    : preSend on channel '[email protected]', message: GenericMessage [payload=39, headers={id=c393cf1c-81c4-dc78-f7d3-a7b356520ee9, timestamp=1514573389521}]
2017-12-29 19:49:49.522 DEBUG 2253 --- [           main] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel '[email protected]', message: GenericMessage [payload=39, headers={id=c393cf1c-81c4-dc78-f7d3-a7b356520ee9, timestamp=1514573389521}]

Some output has been removed to reduce the amount of text.

We can see that for each message sent to the message channel, two lines of log are produced

The first one is written before the message is sent:

2017-12-29 19:49:49.522 DEBUG 2253 --- [           main] o.s.integration.channel.DirectChannel    : preSend on channel '[email protected]', message: GenericMessage [payload=39, headers={id=c393cf1c-81c4-dc78-f7d3-a7b356520ee9, timestamp=1514573389521}]

The second line of log is produced after the message has been sent and no exceptions occurred in connection to sending the message:

2017-12-29 19:49:49.522 DEBUG 2253 --- [           main] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel '[email protected]', message: GenericMessage [payload=39, headers={id=c393cf1c-81c4-dc78-f7d3-a7b356520ee9, timestamp=1514573389521}]

The following information is logged:

  • Whether the logging is before(preSend) or after(postSend) the sending of the message.
  • The channel to which the message is sent.
    In the output above it is [email protected]
  • In the case of postSend, the boolean result of the sending is logged.
    This boolean indicates whether the sending was successful.
  • The type of the message sent to the message channel.
    In the log above, the type of message is GenericMessage.
  • The payload of the message.
    The headers of the message.

Accepted Datatypes in Message Channels

In the following examples the ability to restrict the message payload datatypes accepted by a message channel to one or more types (classes).

In addition an example will show how a message converter may be registered with a message channel. Such a message converter will attempt to convert any type not readily accepted by a message channel into a type that is accepted.

Datatype Match

The first example shows a success scenario; a message channel is configured to accept only message payloads of the type Long and a message containing a long integer is sent to the message channel.

    /**
     * Tests creating a message channel and restrict the data types accepted
     * by the message channel to long integers. A message with a long integer payload
     * is then sent to the message channel.
     * Note that no message converters are registered on the message channel!
     *
     * Expected result: The message should be successfully sent to the message channel
     * and consumed by the consumer.
     */
    @Test
    public void restrictDataTypesAllowedTypeTest() {
        final AbstractMessageChannel theMessageChannel;
        final Message theInputMessage;
        final List theSubscriberReceivedMessages = new CopyOnWriteArrayList();
 
        theInputMessage = MessageBuilder.withPayload(new Long(1337)).build();
 
        theMessageChannel = new DirectChannel();
        theMessageChannel.setComponentName(DIRECT_CHANNEL_NAME);
 
        /* Set the datatype(s) that may be passed through the message channel. */
        theMessageChannel.setDatatypes(Long.class);
 
        /*
         * Create a subscriber (message handler) that adds each received message
         * to a list and register the subscriber with the message channel.
         */
        final MessageHandler theSubscriber = theSubscriberReceivedMessages::add;
        ((DirectChannel)theMessageChannel).subscribe(theSubscriber);
 
        theMessageChannel.send(theInputMessage);
 
        await().atMost(2, TimeUnit.SECONDS).until(() -> theSubscriberReceivedMessages.size() > 0);
 
        /* Verify that the subscriber has received a message. */
        Assert.assertTrue("A single message should have been received",
            theSubscriberReceivedMessages.size() == 1);
 
        LOGGER.info("Message received: " + theSubscriberReceivedMessages.get(0));
    }

Note that:

  • The type of the variable theInputMessage is Message .
    This type was chosen in order to allow for any type of payload in the input message.
  • Having created the message channel, it is configured to accept only message payloads of the type Long using the setDatatypes method.
    This method takes a variable number of arguments, so in order to configure a message channel to accept more than one payload type just add further type parameters.

If we run the test, it should pass and the message, with the long integer payload, should be logged to the console:

2017-12-28 07:26:36.047  INFO 1037 --- [           main] s.i.s.m.MessageChannelsCommonTests       : Message received: GenericMessage [payload=1337, headers={id=3abfbe57-d124-120c-282b-33e11b33a55f, timestamp=1514442395930}]

Datatype Mismatch

The second example shows how to configure a message channel to only accept message payloads of the type Long and what happens when an attempt to send a message containing a string payload is made.

    /**
     * Tests creating a message channel and restrict the data types accepted
     * by the message channel to long integers.
     * Note that no message converters are registered on the message channel!
     *
     * Expected result: An exception should be thrown which contains an error message
     * saying that the message channel does not accepts strings, but only objects of
     * the type {@code Long}.
     */
    @Test(expected = MessageDeliveryException.class)
    public void restrictDataTypesNotAllowedTypeTest() {
        final AbstractMessageChannel theMessageChannel;
        final Message theInputMessage;
        final List theSubscriberReceivedMessages = new CopyOnWriteArrayList();
 
        theInputMessage = MessageBuilder.withPayload(NUMBER_STRING).build();
 
        theMessageChannel = new DirectChannel();
        theMessageChannel.setComponentName(DIRECT_CHANNEL_NAME);
 
        /* Set the datatype(s) that may be passed through the message channel. */
        theMessageChannel.setDatatypes(Long.class);
 
        /*
         * Create a subscriber (message handler) that adds each received message
         * to a list and register the subscriber with the message channel.
         */
        final MessageHandler theSubscriber = theSubscriberReceivedMessages::add;
        ((DirectChannel)theMessageChannel).subscribe(theSubscriber);
 
        /*
         * An exception is expected when sending the message to the message channel
         * and the exception is to be logged to the console.
         */
        try {
            theMessageChannel.send(theInputMessage);
            Assert.fail("An exception should be thrown");
        } catch (final Exception theException) {
            LOGGER.info("Exception thrown when sending message with payload type not allowed",
                theException);
        }
    }

Note that:

  • The type of the variable theInputMessage is Message .
    This type was chosen in order to allow for any type of payload in the input message.
  • Having created the message channel, it is configured to accept only message payloads of the type Long using the setDatatypes method.
  • The statement sending the message to the message channel is surrounded by a try-catch block.
    This was done in order to be able to log the exception to the console.

If the test is run it should pass and output similar to the following should be logged to the console:

2017-12-28 07:30:55.491  INFO 1048 --- [           main] s.i.s.m.MessageChannelsCommonTests       : Exception thrown when sending message with payload type not allowed
 
org.springframework.messaging.MessageDeliveryException: Channel 'MyDirectChannel' expected one of the following datataypes [class java.lang.Long], but received [class java.lang.String]
	at org.springframework.integration.channel.AbstractMessageChannel.convertPayloadIfNecessary(AbstractMessageChannel.java:495)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:422)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:388)
	at se.ivankrizsan.springintegration.messagechannels.MessageChannelsCommonTests.restrictDataTypesNotAllowedTypeTest(MessageChannelsCommonTests.java:404)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.springframework.test.context.junit4.statements.RunBeforeTestExecutionCallbacks.evaluate(RunBeforeTestExecutionCallbacks.java:73)
	at org.springframework.test.context.junit4.statements.RunAfterTestExecutionCallbacks.evaluate(RunAfterTestExecutionCallbacks.java:83)
	at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
	at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
	at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:251)
	at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
	at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
	at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

There is an excellent error message telling us that the message channel MyDirectChannel expected only Long datatypes but received a String .

Datatype Conversion

The last example related to message channels and accepted datatypes will show how to configure a message converter on a message channel, in order to handle data-types not readily accepted by the message channel but which can be converted into a datatype that is accepted.

    /**
     * Tests creating a message channel and restrict the data types accepted
     * by the message channel to long integers.
     * A message converter is also registered on the message channel.
     *
     * Expected result: The subscriber should receive a message that contains
     * a long integer object holding the number from the input message string.
     */
    @Test
    public void restrictMessageChannelDataTypesWithMessageConverterTest() {
        final AbstractMessageChannel theMessageChannel;
        final Message theInputMessage;
        final List theSubscriberReceivedMessages = new CopyOnWriteArrayList();
        final GenericMessageConverter theMessageConverter;
 
        theInputMessage = MessageBuilder.withPayload(NUMBER_STRING).build();
 
        theMessageChannel = new PublishSubscribeChannel();
        theMessageChannel.setComponentName(PUBSUB_CHANNEL_NAME);
 
        /* Set the datatype(s) that may be passed through the message channel. */
        theMessageChannel.setDatatypes(Long.class);
 
        /*
         * Set the message converter that will be used to attempt to convert messages
         * of a data type that is not supported by the message channel.
         * In this case the generic message converter is used. This message converter
         * delegates message conversion to either the default conversion service or a
         * conversion service supplied when the message converter is created.
         */
        theMessageConverter = new GenericMessageConverter();
        theMessageChannel.setMessageConverter(theMessageConverter);
 
        /*
         * Create a subscriber (message handler) that adds each received message
         * to a list and register the subscriber with the message channel.
         */
        final MessageHandler theSubscriber = theSubscriberReceivedMessages::add;
        ((PublishSubscribeChannel)theMessageChannel).subscribe(theSubscriber);
 
        theMessageChannel.send(theInputMessage);
 
        await().atMost(2, TimeUnit.SECONDS).until(() -> theSubscriberReceivedMessages.size() > 0);
 
        Assert.assertTrue("A single message should have been received",
            theSubscriberReceivedMessages.size() == 1);
 
        final Message theOutputMessage = theSubscriberReceivedMessages.get(0);
        final Object theOutputPayload = theOutputMessage.getPayload();
 
        LOGGER.info("Message received: " + theOutputMessage);
        Assert.assertEquals("Output should contain the number from the"
                + " input string and be a long integer",
            NUMBER_VALUE,
            theOutputPayload);
    }

Note that:

  • The type of the variables theInputMessage and theOutputMessage are Message .
    This type was chosen in order to allow for any type of payload in the messages.
  • An input message is created with the payload being a string containing the number “1234567”.
  • Having created the message channel, it is configured to accept only message payloads of the type Long using the setDatatypes method.
  • A message converter is created and the message channel is configured to use this message converter using the setMessageConverter method.
    In the example the GenericMessageConverter available in Spring Integration is used, which will delegate any type conversion to the default conversion service.

If the test is run, it should pass and the following should be output to the console:

2017-12-28 08:13:57.689  INFO 1490 --- [           main] s.i.s.m.MessageChannelsCommonTests       : Message received: GenericMessage [payload=1234567, headers={id=2f7be45a-b72f-18ab-90bf-72cff66fc884, timestamp=1514445237572}]

Message Channel Interceptors

Message channel interceptors are similar to around advice in aspect-oriented programming and the intercepting filter JavaEE pattern. For those familiar with Java servlets, an interceptor is very similar to a servlet filter .

In other words, a message channel interceptor intercepts sending and/or receiving of messages from a message channel at certain locations allowing for modification of messages. An interceptor can even stop the sending of a message to or receiving a message from a message channel from happening.

All channel interceptors in Spring Integration implement the ChannelInterceptor interface that contain the following methods:

  • Message preSend(Message message, MessageChannel channel)
    Invoked before a message is being sent to a message channel.
    Allows for returning a modified message, or null to stop sending message to channel.
  • void postSend(Message message, MessageChannel channel, boolean sent)
    Invoked after a message has been sent to a message channel. The boolean indicates whether message was successfully sent.
  • void afterSendCompletion(Message message, MessageChannel channel, boolean sent, Exception ex)
    Invoked after the sending of a message to a message channel has completed, regardless of any exception thrown during the send operation.
  • boolean preReceive(MessageChannel channel)
    Invoked before trying to receive a message from a pollable message channel. No message will be retrieved if this method returns false.
  • Message postReceive(Message message, MessageChannel channel)
    Invoked after a message has been received from a pollable message channel. Allows for returning a modified message, or null to stop any further interceptors from being called.
  • void afterReceiveCompletion(@Nullable Message message, MessageChannel channel, Exception ex)
    Invoked after the receiving of a message from a pollable message channel has completed, regardless of any exceptions thrown during the receive operation.

The following figure illustrates the order in which the different methods in a ChannelInterceptor are invoked when sending to and receiving from a pollable message channel. A pollable message channel has been chosen in order for all the methods of the interceptor to be invoked.

Sending and receiving from a pollable message channel with a channel interceptor.

Logging and Counting Channel Interceptor

In order to show a code example with a channel interceptor, let’s implement an interceptor first. This channel interceptor will keep a count for each of the different methods in the ChannelInterceptor interface and also log the calls to the methods.

package se.ivankrizsan.springintegration.channelinterceptors.helpers;
 
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.ChannelInterceptor;
 
import java.text.MessageFormat;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
 
/**
 * Message channel interceptor that logs information about the messages being
 * sent and received to/from the channel.
 * In addition, a count for each of the different intercepting points is
 * maintained counting the number of received messages.
 *
 * @author Ivan Krizsan
 */
public class LoggingAndCountingChannelInterceptor implements ChannelInterceptor {
    protected static final Log LOGGER =
        LogFactory.getLog(LoggingAndCountingChannelInterceptor.class);
 
    /* Instance variable(s): */
    protected AtomicInteger mPreSendMessageCount = new AtomicInteger();
    protected AtomicInteger mPostSendMessageCount = new AtomicInteger();
    protected AtomicInteger mAfterSendCompletionMessageCount = new AtomicInteger();
    protected AtomicInteger mPreReceiveMessageCount = new AtomicInteger();
    protected AtomicInteger mPostReceiveMessageCount = new AtomicInteger();
    protected AtomicInteger mAfterReceiveCompletionMessageCount = new AtomicInteger();
 
    @Override
    public Message preSend(final Message inMessage,
        final MessageChannel inChannel) {
        logMessageWithChannelAndPayload("Before message sent.",
            inMessage,
            inChannel,
            (Object[]) null);
        mPreSendMessageCount.incrementAndGet();
        return inMessage;
    }
 
    @Override
    public void postSend(final Message inMessage, final MessageChannel inChannel,
        final boolean inSent) {
        logMessageWithChannelAndPayload("After message sent.",
            inMessage,
            inChannel,
            (Object[]) null);
        mPostSendMessageCount.incrementAndGet();
    }
 
    @Override
    public void afterSendCompletion(final Message inMessage,
        final MessageChannel inChannel,
        final boolean inSent,
        final Exception inException) {
        logMessageWithChannelAndPayload(
            "After completion of message sending. Exception: {0}.",
            inMessage,
            inChannel,
            inException);
        mAfterSendCompletionMessageCount.incrementAndGet();
    }
 
    @Override
    public boolean preReceive(final MessageChannel inChannel) {
        /* Only applies to pollable channels. */
        logMessageWithChannelAndPayload("Pre-receive.",
            null,
            inChannel,
            (Object[]) null);
        mPreReceiveMessageCount.incrementAndGet();
 
        /* Returning true means go ahead with the receive. */
        return true;
    }
 
    @Override
    public Message postReceive(final Message inMessage,
        final MessageChannel inChannel) {
        /* Only applies to pollable channels. */
        logMessageWithChannelAndPayload("Post-receive.",
            null,
            inChannel,
            (Object[]) null);
        mPostReceiveMessageCount.incrementAndGet();
        return inMessage;
    }
 
    @Override
    public void afterReceiveCompletion(final Message inMessage,
        final MessageChannel inChannel,
        final Exception inException) {
        LOGGER.info(
            "After message receive completion. Channel: " + inChannel.toString()
                + " Payload: " + inMessage.getPayload()
                + " Exception: " + inException);
        mAfterReceiveCompletionMessageCount.incrementAndGet();
    }
 
    public int getPreSendMessageCount() {
        return mPreSendMessageCount.get();
    }
 
    public int getPostSendMessageCount() {
        return mPostSendMessageCount.get();
    }
 
    public int getAfterSendCompletionMessageCount() {
        return mAfterSendCompletionMessageCount.get();
    }
 
    public int getPreReceiveMessageCount() {
        return mPreReceiveMessageCount.get();
    }
 
    public int getPostReceiveMessageCount() {
        return mPostReceiveMessageCount.get();
    }
 
    public int getAfterReceiveCompletionMessageCount() {
        return mAfterReceiveCompletionMessageCount.get();
    }
 
    /**
     * Logs a Spring Integration message consisting of the supplied Spring
     * Integration message channel and payload of the supplied message with the
     * supplied log message appended.
     * The supplied log message can contain placeholders as specified by
     * {@code MessageFormat} which will be replaced by the string representation
     * of supplied objects.
     *
     * @param inLogMessage String containing log message with optional placeholders.
     * @param inMessage Spring Integration message which payload to log. May be null.
     * @param inMessageChannel Spring Integration message channel which to log.
     * May be null.
     * @param inAdditionalInMessage Objects which string representation(s) are to
     * be inserted into log message, or null.
     */
    protected void logMessageWithChannelAndPayload(final String inLogMessage,
        final Message inMessage,
        final MessageChannel inMessageChannel,
        final Object... inAdditionalInMessage) {
        if (LOGGER.isInfoEnabled()) {
            final int theAppendMsgParamsStartIndex =
                (inAdditionalInMessage != null) ? inAdditionalInMessage.length : 0;
 
            String theLogMessage =
                new StringBuilder().append(inLogMessage)
                    .append(" Channel: {")
                    .append(theAppendMsgParamsStartIndex)
                    .append("}. Payload: {")
                    .append(theAppendMsgParamsStartIndex + 1)
                    .append("}")
                    .toString();
 
            final Object[] theLogMessageParameters;
            if (inAdditionalInMessage != null) {
                theLogMessageParameters = Arrays.copyOf(inAdditionalInMessage,
                    inAdditionalInMessage.length + 2);
            } else {
                theLogMessageParameters = new Object[2];
            }
 
            theLogMessageParameters[theAppendMsgParamsStartIndex] =
                (inMessageChannel != null)
                    ? inMessageChannel.toString() : "null message channel";
            theLogMessageParameters[theAppendMsgParamsStartIndex + 1] =
                (inMessage != null) ? inMessage.getPayload()
                    : "null message";
            theLogMessage =
                MessageFormat.format(theLogMessage, theLogMessageParameters);
            LOGGER.info(theLogMessage);
        }
    }
}

Nothing surprising about the above class; it implements the ChannelInterceptor interface and a method that logs a message. To keep track of the different counts, instances of AtomicInteger have been used in order to make the counting thread-safe.

Message Channel Interceptor Example

Using the logging and counting channel interceptor above, an example showing how to use message channel interceptors in Spring Integration can now be developed. The example can be found in the MessageChannelCommonTests class:

    /**
     * Tests adding an interceptor for a message channel and send some messages
     * to the message channel.
     * While one or more interceptors can be added to all types of message channels,
     * different types of message channels invoke different sets of methods on the
     * interceptors. Further examples can be found elsewhere.
     *
     * Expected result: The interceptor's preSend, postSend and afterSendCompletion
     * should be invoked once for every message sent.
     */
    @Test
    public void interceptorsTest() {
        final AbstractMessageChannel theMessageChannel;
        final LoggingAndCountingChannelInterceptor theLoggingAndCountingChannelInterceptor;
        final List theSubscriberReceivedMessages = new CopyOnWriteArrayList();
 
        /* Create the message channel. */
        theMessageChannel = new DirectChannel();
        theMessageChannel.setComponentName(DIRECT_CHANNEL_NAME);
 
        /* Create a channel interceptor and add it to the interceptors of the message channel. */
        theLoggingAndCountingChannelInterceptor = new LoggingAndCountingChannelInterceptor();
        theMessageChannel.addInterceptor(theLoggingAndCountingChannelInterceptor);
 
        /*
         * Create a subscriber (message handler) that adds each received message
         * to a list. Register the subscriber with the message channel.
         */
        final MessageHandler theSubscriber = theSubscriberReceivedMessages::add;
        ((DirectChannel)theMessageChannel).subscribe(theSubscriber);
 
        sendSomeMessagesToMessageChannelWithRandomDelay(theMessageChannel);
 
        await().atMost(2, TimeUnit.SECONDS).until(() ->
            theSubscriberReceivedMessages.size() >= METRICSTEST_MESSAGE_COUNT);
 
        /*
         * The interceptor's preSend, postSend and afterSendCompletion should have been
         * invoked once for every message sent.
         */
        Assert.assertEquals(
            "Interceptor preSend method should have been invoked once for every message",
            METRICSTEST_MESSAGE_COUNT,
            theLoggingAndCountingChannelInterceptor.getPreSendMessageCount());
        Assert.assertEquals(
            "Interceptor postSend method should have been invoked once for every message",
            METRICSTEST_MESSAGE_COUNT,
            theLoggingAndCountingChannelInterceptor.getPostSendMessageCount());
        Assert.assertEquals(
            "Interceptor afterSendCompletion method should have been invoked once for"
            + " every message",
            METRICSTEST_MESSAGE_COUNT,
            theLoggingAndCountingChannelInterceptor.getAfterSendCompletionMessageCount());
    }

Note that:

  • Having created the message channel, an instance of the LoggingAndCountingChannelInterceptor is created.
  • The message channel interceptor is added to the interceptors of the message channel using the addInterceptor method.
    It is possible to add multiple interceptors to a message channel and, if using the addInterceptor method that takes a single parameter, the last added interceptor will also be the interceptor that is invoked last in the chain of interceptors.
    There is also an addInterceptor method that takes two parameters; one index and a channel interceptor instance. This method will add the interceptor at the supplied index in the interceptor list.
  • Finally there are assertions verifying that each of the preSend , postSend and afterSendCompletion counters in the interceptor does match the number of messages sent to the message channel.
    As in the assertion message strings, each of these methods are expected to be invoked once for each message sent to the message channel.

If the example is run, the test should pass and for each message there should be three lines of log output written to the console:

2017-12-31 18:00:42.616  INFO 1457 --- [main] c.h.LoggingAndCountingChannelInterceptor : Before message sent. Channel: [email protected] Payload: 34
2017-12-31 18:00:42.616  INFO 1457 --- [main] c.h.LoggingAndCountingChannelInterceptor : After message sent. Channel: [email protected] Payload: 34
2017-12-31 18:00:42.616  INFO 1457 --- [main] c.h.LoggingAndCountingChannelInterceptor : After completion of message sending. Exception: null. Channel: [email protected] Payload: 34

We can see that the three methods preSend , postSend and afterSendCompletion in the message channel interceptor are indeed invoked once for each message.

This concludes this first article on Spring Integration message channels!

Happy coding!

Ivan Krizsan稿源:Ivan Krizsan (源链) | 关于 | 阅读提示

本站遵循[CC BY-NC-SA 4.0]。如您有版权、意见投诉等问题,请通过eMail联系我们处理。
酷辣虫 » 综合编程 » Spring Integration 5 – Message Channels, Introduction and Common Properties

喜欢 (0)or分享给?

专业 x 专注 x 聚合 x 分享 CC BY-NC-SA 4.0

使用声明 | 英豪名录