Recipient List

The Recipient List from the EIP patterns allows you to route messages to a number of dynamically specified recipients.

The recipients will receive a copy of the same Exchange and Camel will execute them sequentially.

Options

Name Default Value Description
delimiter , Delimiter used if the Expression returned multiple endpoints.
strategyRef   Refers to an AggregationStrategy to be used to assemble the replies from the recipients, into a single outgoing message from the Recipient List. By default Camel will use the last reply as the outgoing message.
parallelProcessing false Camel 2.2: If enables then sending messages to the recipients occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. Its only the sending and processing the replies from the recipients which happens concurrently.
executorServiceRef   Camel 2.2: Refers to a custom Thread Pool to be used for parallel processing. Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well.
stopOnException false Camel 2.2: Whether or not to stop continue processing immediately when an exception occurred. If disable, then Camel will send the message to all recipients regardless if one of them failed. You can deal with exceptions in the AggregationStrategy class where you have full control how to handle that.
ignoreInvalidEndpoints false Camel 2.3: If an endpoint uri could not be resolved, should it be ignored. Otherwise Camel will thrown an exception stating the endpoint uri is not valid.
streaming false Camel 2.5: If enabled then Camel will process replies out-of-order, eg in the order they come back. If disabled, Camel will process replies in the same order as the Expression specified.
timeout   Camel 2.5: Sets a total timeout specified in millis. If the Recipient List hasn't been able to send and process all replies within the given timeframe, then the timeout triggers and the Recipient List breaks out and continues. Notice if you provide a TimeoutAwareAggregationStrategy then the timeout method is invoked before breaking out.
onPrepareRef   Camel 2.8: Refers to a custom Processor to prepare the copy of the Exchange each recipient will receive. This allows you to do any custom logic, such as deep-cloning the message payload if that's needed etc.
shareUnitOfWork false Camel 2.8: Whether the unit of work should be shared. See the same option on Splitter for more details.

Static Recipient List

The following example shows how to route a request from an input queue:a endpoint to a static list of destinations

Using Annotations
You can use the RecipientList Annotation on a POJO to create a Dynamic Recipient List. For more details see the Bean Integration.

Using the Fluent Builders

RouteBuilder builder = new RouteBuilder() {
    public void configure() {
        errorHandler(deadLetterChannel("mock:error"));

        from("seda:a")
            .multicast().to("seda:b", "seda:c", "seda:d");
    }
};

Using the Spring XML Extensions

<camelContext errorHandlerRef="errorHandler" xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="seda:a"/>
        <multicast>
            <to uri="seda:b"/>
            <to uri="seda:c"/>
            <to uri="seda:d"/>
        </multicast>
    </route>
</camelContext>

Dynamic Recipient List

Usually one of the main reasons for using the Recipient List pattern is that the list of recipients is dynamic and calculated at runtime. The following example demonstrates how to create a dynamic recipient list using an Expression (which in this case it extracts a named header value dynamically) to calculate the list of endpoints which are either of type Endpoint or are converted to a String and then resolved using the endpoint URIs.

Using the Fluent Builders

RouteBuilder builder = new RouteBuilder() {
    public void configure() {
        errorHandler(deadLetterChannel("mock:error"));

        from("seda:a")
            .recipientList(header("foo"));
    }
};

The above assumes that the header contains a list of endpoint URIs. The following takes a single string header and tokenizes it

from("direct:a").recipientList(
        header("recipientListHeader").tokenize(","));
Iteratable value

The dynamic list of recipients that are defined in the header must be iteratable such as:

  • java.util.Collection
  • java.util.Iterator
  • arrays
  • org.w3c.dom.NodeList
  • Camel 1.6.0: a single String with values separated with comma
  • any other type will be regarded as a single value

Using the Spring XML Extensions

<camelContext errorHandlerRef="errorHandler" xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="seda:a"/>
        <recipientList>
            <xpath>$foo</xpath>
        </recipientList>
    </route>
</camelContext>

For further examples of this pattern in use you could look at one of the junit test case

Using delimiter in Spring XML

Available as of Camel 1.6.0
In Spring DSL you can set the delimiter attribute for setting a delimiter to be used if the header value is a single String with multiple separated endpoints. By default Camel uses comma as delimiter, but this option lets you specify a customer delimiter to use instead.

<route>
  <from uri="direct:a" />
  <!-- use comma as a delimiter for String based values -->
  <recipientList delimiter=",">
    <header>myHeader</header>
  </recipientList>
</route>

So if myHeader contains a String with the value "activemq:queue:foo, activemq:topic:hello , log:bar" then Camel will split the String using the delimiter given in the XML that was comma, resulting into 3 endpoints to send to. You can use spaces between the endpoints as Camel will trim the value when it lookup the endpoint to send to.

Note: In Java DSL you use the tokenizer to archive the same. The route above in Java DSL:

    from("direct:a").recipientList(header("myHeader").tokenize(","));

In Camel 2.1 its a bit easier as you can pass in the delimiter as 2nd parameter:

    from("direct:a").recipientList(header("myHeader"), "#");

Sending to multiple recipients in parallel

Available as of Camel 2.2

The Recipient List now supports parallelProcessing that for example Splitter also supports. You can use it to use a thread pool to have concurrent tasks sending the Exchange to multiple recipients concurrently.

    from("direct:a").recipientList(header("myHeader")).parallelProcessing();

And in Spring XML its an attribute on the recipient list tag.

   <route>
       <from uri="direct:a"/>
       <recipientList parallelProcessing="true">
           <header>myHeader</header>
       </recipientList>
   </route>

Stop continuing in case one recipient failed

Available as of Camel 2.2

The Recipient List now supports stopOnException that for example Splitter also supports. You can use it to stop sending to any further recipients in case any recipient failed.

    from("direct:a").recipientList(header("myHeader")).stopOnException();

And in Spring XML its an attribute on the recipient list tag.

   <route>
       <from uri="direct:a"/>
       <recipientList stopOnException="true">
           <header>myHeader</header>
       </recipientList>
   </route>

Note: You can combine parallelProcessing and stopOnException and have them both true.

Ignore invalid endpoints

Available as of Camel 2.3

The Recipient List now supports ignoreInvalidEndpoints which the Routing Slip also supports. You can use it to skip endpoints which is invalid.

    from("direct:a").recipientList(header("myHeader")).ignoreInvalidEndpoints();

And in Spring XML its an attribute on the recipient list tag.

   <route>
       <from uri="direct:a"/>
       <recipientList ignoreInvalidEndpoints="true">
           <header>myHeader</header>
       </recipientList>
   </route>

Then lets say the myHeader contains the following two endpoints direct:foo,xxx:bar. The first endpoint is valid and works. However the 2nd is invalid and will just be ignored. Camel logs at INFO level about, so you can see why the endpoint was invalid.

Using custom AggregationStrategy

Available as of Camel 2.2

You can now use you own AggregationStrategy with the Recipient List. However its not that often you need that. What its good for is that in case you are using Request Reply messaging then the replies from the recipient can be aggregated. By default Camel uses UseLatestAggregationStrategy which just keeps that last received reply. What if you must remember all the bodies that all the recipients send back, then you can use your own custom aggregator that keeps those. Its the same principle as with the Aggregator EIP so check it out for details.

    from("direct:a")
        .recipientList(header("myHeader")).aggregationStrategy(new MyOwnAggregationStrategy())
        .to("direct:b");

And in Spring XML its an attribute on the recipient list tag.

   <route>
       <from uri="direct:a"/>
       <recipientList strategyRef="myStrategy">
           <header>myHeader</header>
       </recipientList>
       <to uri="direct:b"/>
   </route>

   <bean id="myStrategy" class="com.mycompany.MyOwnAggregationStrategy"/>

Using custom thread pool

Available as of Camel 2.2

A thread pool is only used for parallelProcessing. You supply your own custom thread pool via the ExecutorServiceStrategy (see Camel's Threading Model), the same way you would do it for the aggregationStrategy. By default Camel uses a thread pool with 10 threads (subject to change in a future version).

Using method call as recipient list

You can use a Bean to provide the recipients, for example:

from("activemq:queue:test").recipientList().method(MessageRouter.class, "routeTo");

And then MessageRouter:

public class MessageRouter {

    public String routeTo() {
        String queueName = "activemq:queue:test2";
        return queueName;
    }
}

When you use a Bean then do not also use the @RecipientList annotation as this will in fact add yet another recipient list, so you end up having two. Do not do like this.

public class MessageRouter {

    @RecipientList
    public String routeTo() {
        String queueName = "activemq:queue:test2";
        return queueName;
    }
}

Well you should only do like that above (using @RecipientList) if you route just route to a Bean which you then want to act as a recipient list.
So the original route can be changed to:

from("activemq:queue:test").bean(MessageRouter.class, "routeTo");

Which then would invoke the routeTo method and detect its annotated with @RecipientList and then act accordingly as if it was a recipient list EIP.

Using timeout

Available as of Camel 2.5

If you use parallelProcessing then you can configure a total timeout value in millis. Camel will then process the messages in parallel until the timeout is hit. This allows you to continue processing if one message is slow. For example you can set a timeout value of 20 sec.

For example in the unit test below you can see we multicast the message to 3 destinations. We have a timeout of 2 seconds, which means only the last two messages can be completed within the timeframe. This means we will only aggregate the last two which yields a result aggregation which outputs "BC".

from("direct:start")
    .multicast(new AggregationStrategy() {
            public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                if (oldExchange == null) {
                    return newExchange;
                }

                String body = oldExchange.getIn().getBody(String.class);
                oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class));
                return oldExchange;
            }
        })
        .parallelProcessing().timeout(250).to("direct:a", "direct:b", "direct:c")
    // use end to indicate end of multicast route
    .end()
    .to("mock:result");

from("direct:a").delay(1000).to("mock:A").setBody(constant("A"));

from("direct:b").to("mock:B").setBody(constant("B"));

from("direct:c").to("mock:C").setBody(constant("C"));
Timeout in other EIPs
This timeout feature is also supported by Splitter and both multicast and recipientList.

By default if a timeout occurs the AggregationStrategy is not invoked. However you can implement a specialized version

public interface TimeoutAwareAggregationStrategy extends AggregationStrategy {

    /**
     * A timeout occurred
     *
     * @param oldExchange  the oldest exchange (is <tt>null</tt> on first aggregation as we only have the new exchange)
     * @param index        the index
     * @param total        the total
     * @param timeout      the timeout value in millis
     */
    void timeout(Exchange oldExchange, int index, int total, long timeout);

This allows you to deal with the timeout in the AggregationStrategy if you really need to.

Timeout is total
The timeout is total, which means that after X time, Camel will aggregate the messages which has completed within the timeframe. The remainders will be cancelled. Camel will also only invoke the timeout method in the TimeoutAwareAggregationStrategy once, for the first index which caused the timeout.

Using onPrepare to execute custom logic when preparing messages

Available as of Camel 2.8

See details at Multicast

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

© 2004-2011 The Apache Software Foundation.
Apache Camel, Camel, Apache, the Apache feather logo, and the Apache Camel project logo are trademarks of The Apache Software Foundation. All other marks mentioned may be trademarks or registered trademarks of their respective owners.
Graphic Design By Hiram