β

Spring Integration 5.0 Milestone 2 Available

Spring 4 阅读

On behalf of the Spring Integration team I’d like to announce the Second Milestone of Spring Integration 5.0 , which is available in the Milestone Repository .

Some highlights of this release since the previous Milestone.

Of course, first of all, big thanks to you, the community, for your contributions!

MongoDb Improvements

  • MongoDbOutboundGateway - for performing queries or any arbitrary operation on the collection

  • An initial Java DSL support for MongoDB components

  • The MongoDb component now can use org.springframework.data.mongodb.core.query.Query API in their expressions

@Bean
public IntegrationFlow mongoDbGatewayFlow() {
    return f -> f
        .handle(MongoDb.outboundGateway(this.mongoTemplate)
                            .collectionCallback(MongoCollection::count)
                            .collectionNameFunction(m ->
                                           m.getHeaders().get("collection")));
}

@MessagingGateway and Java DSL

The Java DSL IntegrationFlow can now start from the interface marked with @MessagingGateway , and all the method calls on the target proxy bean will perform sending Message to the downstream IntegrationFlow . This lets you omit @IntegrationComponentScan and extra channels configuration. For example a simple gateway for Control Bus component:

@MessagingGateway
public interface ControlBusGateway {

    void send(String command);

}
...
@Bean
public IntegrationFlow controlBusFlow() {
    return IntegrationFlows.from(ControlBusGateway.class)
                 .controlBus()
                 .get();
}

Reactive Streams Support

And of course some news from the Reactive Streams subject.

The MessageChannelReactiveUtils can be used to adapt any MessageChannel to the org.reactivestreams.Publisher . This can be useful when you would like to "flux" an upstream data via integration loosely coupled manner from one side and reactive back pressure from another:

@Autowired
private PollableChannel queueChannel;
...
Flux.from(MessageChannelReactiveUtils.<String>toPublisher(this.queueChannel))
                   .map(Message::getPayload)
                   .map(String::toUpperCase)
                   .doOnNext(results::add)
                   .subscribe(v -> done.countDown());

This technique is used now in the existing IntegrationFlowDefinition.toReactivePublisher() :

@Bean
public Publisher<Message<Integer>> pollableReactiveFlow() {
    return IntegrationFlows
             .from("inputChannel")
             .split(s -> s.delimiters(","))
             .<String, Integer>transform(Integer::parseInt)
             .channel(MessageChannels.queue())
             .toReactivePublisher();
}
...
@Autowired
@Qualifier("pollableReactiveFlow")
private Publisher<Message<Integer>> pollablePublisher;

The ReactiveChannel now has ability to subscribe to upstream Publisher alongside with the regular (but back pressure) send(Message<?>) implementation. This allowed us to introduce a feature like start an IntegrationFlow from the Publisher :

Flux<Message<?>> messageFlux = Flux.just("1,2,3,4")
        .map(v -> v.split(","))
        .flatMapIterable(Arrays::asList)
        .map(Integer::parseInt)
        .map(GenericMessage<Integer>::new);

QueueChannel resultChannel = new QueueChannel();

IntegrationFlow integrationFlow =
        IntegrationFlows.from(messageFlux)
                .<Integer, Integer>transform(p -> p * 2)
                .channel(resultChannel)
                .get();

this.integrationFlowContext.registration(integrationFlow)
        .register();

Such wise and placing ReactiveChannel in between endpoints ( MessageChannels.reactive() ), we can reach the best of both integration and reactive worlds!

See What’s New for more information.

We are going to provide more features and improvement in the next Milestones, so, stay tuned and don’t hesitate to come back to us for any feedback!

Project Page | GitHub | Help | Documentation | Chat

作者:Spring
原文地址:Spring Integration 5.0 Milestone 2 Available, 感谢原作者分享。

发表评论