|
For the time being this is kind of an experimental branch, so the MuteMP interface may disappear altogether, otherwise will get a better name and document it properly On Mon, Jul 23, 2012 at 11:04 AM, Pablo Kraan <[hidden email]> wrote:
MuteMessageProcessor needs a javadoc explaining the purpose. Maybe a better name will also help.In AsyncDelegateMessageProcessor#getMessageProcessors could be better to return an empty list instead of throwing an exception. The same applies to SedaStageInterceptingMessageProcessor
Pablo
- Revision
- 24632
- Author
- svacas
- Date
- 2012-07-18 11:19:39 -0500 (Wed, 18 Jul 2012)
Log Message
allow to obtain visible children message processors from parent message processors
Modified Paths
Added Paths
Diff
Added: branches/mule-3.x-debugger/core/src/main/java/org/mule/api/processor/MessageProcessorContainer.java (0 => 24632)
--- branches/mule-3.x-debugger/core/src/main/java/org/mule/api/processor/MessageProcessorContainer.java (rev 0)
+++ branches/mule-3.x-debugger/core/src/main/java/org/mule/api/processor/MessageProcessorContainer.java 2012-07-18 16:19:39 UTC (rev 24632)
@@ -0,0 +1,21 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.api.processor;
+
+import java.util.List;
+
+/**
+ *
+ */
+public interface MessageProcessorContainer
+{
+ List<MessageProcessor> getMessageProcessors();
+}
Property changes on: branches/mule-3.x-debugger/core/src/main/java/org/mule/api/processor/MessageProcessorContainer.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Modified: branches/mule-3.x-debugger/core/src/main/java/org/mule/api/processor/MessageProcessors.java (24631 => 24632)
--- branches/mule-3.x-debugger/core/src/main/java/org/mule/api/processor/MessageProcessors.java 2012-07-18 16:11:35 UTC (rev 24631)
+++ branches/mule-3.x-debugger/core/src/main/java/org/mule/api/processor/MessageProcessors.java 2012-07-18 16:19:39 UTC (rev 24632)
@@ -24,6 +24,12 @@
import org.mule.api.lifecycle.Stoppable;
import org.mule.processor.chain.DefaultMessageProcessorChain;
+import com.sun.tools.internal.ws.wsdl.document.Message;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
/**
* Some convenience methods for message processors.
*/
@@ -41,15 +47,15 @@
public static MessageProcessor lifecyleAwareMessageProcessorWrapper(final MessageProcessor mp)
{
- return new LifecyleAwareMessageProcessorWrapper(mp);
+ return new LifecycleAwareMessageProcessorWrapper(mp);
}
- private static class LifecyleAwareMessageProcessorWrapper
- implements MessageProcessor, Lifecycle, MuleContextAware, FlowConstructAware
+ private static class LifecycleAwareMessageProcessorWrapper
+ implements MessageProcessor, Lifecycle, MuleContextAware, FlowConstructAware, MessageProcessorContainer, MuteMessageProcessor
{
private MessageProcessor delegate;
- public LifecyleAwareMessageProcessorWrapper(MessageProcessor delegate)
+ public LifecycleAwareMessageProcessorWrapper(MessageProcessor delegate)
{
this.delegate = delegate;
}
@@ -113,5 +119,11 @@
{
return delegate.process(event);
}
+
+ @Override
+ public List<MessageProcessor> getMessageProcessors()
+ {
+ return Collections.singletonList(delegate);
+ }
}
}
Added: branches/mule-3.x-debugger/core/src/main/java/org/mule/api/processor/MuteMessageProcessor.java (0 => 24632)
--- branches/mule-3.x-debugger/core/src/main/java/org/mule/api/processor/MuteMessageProcessor.java (rev 0)
+++ branches/mule-3.x-debugger/core/src/main/java/org/mule/api/processor/MuteMessageProcessor.java 2012-07-18 16:19:39 UTC (rev 24632)
@@ -0,0 +1,9 @@
+package org.mule.api.processor;
+
+/**
+ *
+ */
+public interface MuteMessageProcessor
+{
+
+}
Property changes on: branches/mule-3.x-debugger/core/src/main/java/org/mule/api/processor/MuteMessageProcessor.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
Modified: branches/mule-3.x-debugger/core/src/main/java/org/mule/exception/AbstractExceptionListener.java (24631 => 24632)
--- branches/mule-3.x-debugger/core/src/main/java/org/mule/exception/AbstractExceptionListener.java 2012-07-18 16:11:35 UTC (rev 24631)
+++ branches/mule-3.x-debugger/core/src/main/java/org/mule/exception/AbstractExceptionListener.java 2012-07-18 16:19:39 UTC (rev 24632)
@@ -81,11 +81,6 @@
}
}
- public List<MessageProcessor> getMessageProcessors()
- {
- return messageProcessors;
- }
-
public void setMessageProcessors(List<MessageProcessor> processors)
{
if (processors != null)
Modified: branches/mule-3.x-debugger/core/src/main/java/org/mule/processor/AbstractMessageProcessorOwner.java (24631 => 24632)
--- branches/mule-3.x-debugger/core/src/main/java/org/mule/processor/AbstractMessageProcessorOwner.java 2012-07-18 16:11:35 UTC (rev 24631)
+++ branches/mule-3.x-debugger/core/src/main/java/org/mule/processor/AbstractMessageProcessorOwner.java 2012-07-18 16:19:39 UTC (rev 24632)
@@ -20,11 +20,12 @@
import org.mule.api.context.MuleContextAware;
import org.mule.api.lifecycle.Lifecycle;
import org.mule.api.processor.MessageProcessor;
+import org.mule.api.processor.MessageProcessorContainer;
/**
* An object that owns message processors and delegates startup/shutdown events to them.
*/
-public abstract class AbstractMessageProcessorOwner extends AbstractMuleObjectOwner<MessageProcessor> implements Lifecycle, MuleContextAware, FlowConstructAware, AnnotatedObject
+public abstract class AbstractMessageProcessorOwner extends AbstractMuleObjectOwner<MessageProcessor> implements Lifecycle, MuleContextAware, FlowConstructAware, AnnotatedObject, MessageProcessorContainer
{
private final Map<QName, Object> annotations = new ConcurrentHashMap<QName, Object>();
@@ -51,5 +52,11 @@
protected abstract List<MessageProcessor> getOwnedMessageProcessors();
+ @Override
+ public List<MessageProcessor> getMessageProcessors()
+ {
+ return getOwnedMessageProcessors();
+ }
+
}
Modified: branches/mule-3.x-debugger/core/src/main/java/org/mule/processor/AsyncDelegateMessageProcessor.java (24631 => 24632)
--- branches/mule-3.x-debugger/core/src/main/java/org/mule/processor/AsyncDelegateMessageProcessor.java 2012-07-18 16:11:35 UTC (rev 24631)
+++ branches/mule-3.x-debugger/core/src/main/java/org/mule/processor/AsyncDelegateMessageProcessor.java 2012-07-18 16:19:39 UTC (rev 24632)
@@ -23,6 +23,7 @@
import org.mule.api.lifecycle.Stoppable;
import org.mule.api.processor.MessageProcessor;
import org.mule.api.processor.MessageProcessorChainBuilder;
+import org.mule.api.processor.MessageProcessorContainer;
import org.mule.api.processor.ProcessingStrategy;
import org.mule.api.processor.ProcessingStrategy.StageNameSource;
import org.mule.config.i18n.CoreMessages;
@@ -44,8 +45,9 @@
* configured on the inbound endpoint. If a transaction is present then an exception is thrown.
*/
public class AsyncDelegateMessageProcessor extends AbstractMessageProcessorOwner
- implements MessageProcessor, Initialisable, Startable, Stoppable
+ implements MessageProcessor, Initialisable, Startable, Stoppable
{
+
protected Log logger = LogFactory.getLog(getClass());
protected MessageProcessor delegate;
@@ -79,16 +81,15 @@
StageNameSource nameSource = null;
if (name != null)
{
- nameSource = ((Flow)flowConstruct).getAsyncStageNameSource(name);
+ nameSource = ((Flow) flowConstruct).getAsyncStageNameSource(name);
}
else
{
- nameSource = ((Flow)flowConstruct).getAsyncStageNameSource();
+ nameSource = ((Flow) flowConstruct).getAsyncStageNameSource();
}
MessageProcessorChainBuilder builder = new DefaultMessageProcessorChainBuilder(flowConstruct);
- processingStrategy.configureProcessors(Collections.singletonList(delegate), nameSource, builder,
- muleContext);
+ processingStrategy.configureProcessors(Collections.singletonList(delegate), nameSource, builder, muleContext);
try
{
target = builder.build();
@@ -111,7 +112,7 @@
{
// Clone event and make it async
MuleEvent newEvent = new DefaultMuleEvent(
- (MuleMessage)((ThreadSafeAccess)event.getMessage()).newThreadCopy(), event, false);
+ (MuleMessage) ((ThreadSafeAccess) event.getMessage()).newThreadCopy(), event, false);
target.process(newEvent);
}
if (muleContext.getConfiguration().isFlowEndingWithOneWayEndpointReturnsNull())
@@ -140,6 +141,16 @@
return processingStrategy;
}
+ @Override
+ public List<MessageProcessor> getMessageProcessors()
+ {
+ if (!(delegate instanceof MessageProcessorContainer))
+ {
+ throw new IllegalStateException("delegate must be a MessageProcessorContainer");
+ }
+ return ((MessageProcessorContainer) delegate).getMessageProcessors();
+ }
+
class AsyncMessageProcessorWorker extends AbstractMuleEventWork
{
public AsyncMessageProcessorWorker(MuleEvent event)
Modified: branches/mule-3.x-debugger/core/src/main/java/org/mule/processor/ResponseMessageProcessorAdapter.java (24631 => 24632)
--- branches/mule-3.x-debugger/core/src/main/java/org/mule/processor/ResponseMessageProcessorAdapter.java 2012-07-18 16:11:35 UTC (rev 24631)
+++ branches/mule-3.x-debugger/core/src/main/java/org/mule/processor/ResponseMessageProcessorAdapter.java 2012-07-18 16:19:39 UTC (rev 24632)
@@ -24,8 +24,12 @@
import org.mule.api.lifecycle.Startable;
import org.mule.api.lifecycle.Stoppable;
import org.mule.api.processor.MessageProcessor;
+import org.mule.api.processor.MessageProcessorContainer;
-public class ResponseMessageProcessorAdapter extends AbstractResponseMessageProcessor implements Lifecycle, FlowConstructAware
+import java.util.Collections;
+import java.util.List;
+
+public class ResponseMessageProcessorAdapter extends AbstractResponseMessageProcessor implements Lifecycle, FlowConstructAware, MessageProcessorContainer
{
protected MessageProcessor responseProcessor;
@@ -118,4 +122,13 @@
this.flowConstruct = flowConstruct;
}
+ @Override
+ public List<MessageProcessor> getMessageProcessors()
+ {
+ if (!(responseProcessor instanceof MessageProcessorContainer))
+ {
+ throw new IllegalStateException("ResponseProcessor must be a MessageProcessorContainer");
+ }
+ return ((MessageProcessorContainer) responseProcessor).getMessageProcessors();
+ }
}
Modified: branches/mule-3.x-debugger/core/src/main/java/org/mule/processor/SedaStageInterceptingMessageProcessor.java (24631 => 24632)
--- branches/mule-3.x-debugger/core/src/main/java/org/mule/processor/SedaStageInterceptingMessageProcessor.java 2012-07-18 16:11:35 UTC (rev 24631)
+++ branches/mule-3.x-debugger/core/src/main/java/org/mule/processor/SedaStageInterceptingMessageProcessor.java 2012-07-18 16:19:39 UTC (rev 24632)
@@ -25,6 +25,8 @@
import org.mule.api.lifecycle.LifecycleCallback;
import org.mule.api.lifecycle.LifecycleException;
import org.mule.api.processor.MessageProcessor;
+import org.mule.api.processor.MessageProcessorContainer;
+import org.mule.api.processor.MuteMessageProcessor;
import org.mule.api.service.FailedToQueueEventException;
import org.mule.config.QueueProfile;
import org.mule.config.i18n.CoreMessages;
@@ -32,6 +34,7 @@
import org.mule.execution.TransactionalErrorHandlingExecutionTemplate;
import org.mule.lifecycle.EmptyLifecycleCallback;
import org.mule.management.stats.QueueStatistics;
+import org.mule.processor.chain.InterceptingChainLifecycleWrapper;
import org.mule.service.Pausable;
import org.mule.service.Resumable;
import org.mule.util.concurrent.WaitableBoolean;
@@ -41,16 +44,21 @@
import org.mule.work.MuleWorkManager;
import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkException;
+import org.apache.commons.collections.ListUtils;
+
/**
* Processes {@link MuleEvent}'s asynchronously using a {@link MuleWorkManager} to schedule asynchronous
* processing of the next {@link MessageProcessor}.
*/
public class SedaStageInterceptingMessageProcessor extends AsyncInterceptingMessageProcessor
- implements Work, Lifecycle, Pausable, Resumable
+ implements Work, Lifecycle, Pausable, Resumable, MessageProcessorContainer
{
protected static final String QUEUE_NAME_PREFIX = "seda.queue";
@@ -424,4 +432,13 @@
lifecycleManager.fireResumePhase(new EmptyLifecycleCallback<SedaStageInterceptingMessageProcessor>());
}
+ @Override
+ public List<MessageProcessor> getMessageProcessors()
+ {
+ if (next instanceof InterceptingChainLifecycleWrapper)
+ {
+ return ((InterceptingChainLifecycleWrapper) next).getMessageProcessors();
+ }
+ return Collections.singletonList(next);
+ }
}
Modified: branches/mule-3.x-debugger/core/src/main/java/org/mule/processor/chain/InterceptingChainLifecycleWrapper.java (24631 => 24632)
--- branches/mule-3.x-debugger/core/src/main/java/org/mule/processor/chain/InterceptingChainLifecycleWrapper.java 2012-07-18 16:11:35 UTC (rev 24631)
+++ branches/mule-3.x-debugger/core/src/main/java/org/mule/processor/chain/InterceptingChainLifecycleWrapper.java 2012-07-18 16:19:39 UTC (rev 24632)
@@ -14,8 +14,10 @@
import org.mule.api.MuleEvent;
import org.mule.api.MuleException;
import org.mule.api.context.MuleContextAware;
+import org.mule.api.processor.AbstractMessageProcessorVisibilityAware;
import org.mule.api.processor.MessageProcessor;
import org.mule.api.processor.MessageProcessorChain;
+import org.mule.api.processor.MessageProcessorContainer;
import org.mule.execution.MessageProcessorExecutionTemplate;
import java.util.List;
@@ -26,7 +28,7 @@
* MessageProcessor in the parent chain is not injected into the first in the nested
* chain.
*/
-public class InterceptingChainLifecycleWrapper extends AbstractMessageProcessorChain
+public class InterceptingChainLifecycleWrapper extends AbstractMessageProcessorChain implements MessageProcessorContainer
{
private MessageProcessorChain chain;
private MessageProcessorExecutionTemplate messageProcessorExecutionTemplate = MessageProcessorExecutionTemplate.createExecutionTemplate();
Modified: branches/mule-3.x-debugger/core/src/main/java/org/mule/routing/AbstractSelectiveRouter.java (24631 => 24632)
--- branches/mule-3.x-debugger/core/src/main/java/org/mule/routing/AbstractSelectiveRouter.java 2012-07-18 16:11:35 UTC (rev 24631)
+++ branches/mule-3.x-debugger/core/src/main/java/org/mule/routing/AbstractSelectiveRouter.java 2012-07-18 16:19:39 UTC (rev 24632)
@@ -25,6 +25,7 @@
import org.mule.api.lifecycle.Startable;
import org.mule.api.lifecycle.Stoppable;
import org.mule.api.processor.MessageProcessor;
+import org.mule.api.processor.MessageProcessorContainer;
import org.mule.api.routing.RoutePathNotFoundException;
import org.mule.api.routing.RouterResultsHandler;
import org.mule.api.routing.RouterStatisticsRecorder;
@@ -44,9 +45,10 @@
import javax.xml.namespace.QName;
import org.apache.commons.collections.ListUtils;
+import org.apache.commons.collections.Transformer;
public abstract class AbstractSelectiveRouter
- implements SelectiveRouter, RouterStatisticsRecorder, Lifecycle, FlowConstructAware, MuleContextAware, AnnotatedObject
+ implements SelectiveRouter, RouterStatisticsRecorder, Lifecycle, FlowConstructAware, MuleContextAware, AnnotatedObject, MessageProcessorContainer
{
private final List<MessageProcessorFilterPair> conditionalMessageProcessors = new ArrayList<MessageProcessorFilterPair>();
private MessageProcessor defaultProcessor;
@@ -369,4 +371,16 @@
return String.format("%s [flow-construct=%s, started=%s]", getClass().getSimpleName(),
flowConstruct != null ? flowConstruct.getName() : null, started);
}
+
+ @Override
+ public List<MessageProcessor> getMessageProcessors()
+ {
+ List<MessageProcessor> messageProcessors = new ArrayList<MessageProcessor>();
+ for (MessageProcessorFilterPair cmp : conditionalMessageProcessors)
+ {
+ messageProcessors.add(cmp.getMessageProcessor());
+ }
+ messageProcessors.add(defaultProcessor);
+ return messageProcessors;
+ }
}
Modified: branches/mule-3.x-debugger/core/src/main/java/org/mule/routing/Foreach.java (24631 => 24632)
--- branches/mule-3.x-debugger/core/src/main/java/org/mule/routing/Foreach.java 2012-07-18 16:11:35 UTC (rev 24631)
+++ branches/mule-3.x-debugger/core/src/main/java/org/mule/routing/Foreach.java 2012-07-18 16:19:39 UTC (rev 24632)
@@ -186,11 +186,9 @@
splitter.setBatchSize(batchSize);
splitter.setCounterVariableName(counterVariableName);
splitter.setMuleContext(muleContext);
- messageProcessors.add(0, splitter);
-
try
{
- this.ownedMessageProcessor = new DefaultMessageProcessorChainBuilder().chain(messageProcessors)
+ this.ownedMessageProcessor = new DefaultMessageProcessorChainBuilder().chain(splitter).chain(messageProcessors)
.build();
}
catch (MuleException e)
Modified: branches/mule-3.x-debugger/core/src/main/java/org/mule/transport/AbstractConnector.java (24631 => 24632)
--- branches/mule-3.x-debugger/core/src/main/java/org/mule/transport/AbstractConnector.java 2012-07-18 16:11:35 UTC (rev 24631)
+++ branches/mule-3.x-debugger/core/src/main/java/org/mule/transport/AbstractConnector.java 2012-07-18 16:19:39 UTC (rev 24632)
@@ -2624,7 +2624,7 @@
// message only after successful send/dispatch
notificationMessageProcessor.dispatchNotification(beginNotification);
notificationMessageProcessor.process((result != null && !VoidMuleEvent.getInstance().equals(
- result)) ? result : event);
+ result)) ? result : event);
return result;
}
Modified: branches/mule-3.x-debugger/modules/spring-config/src/main/java/org/mule/config/spring/factories/MessageProcessorChainFactoryBean.java (24631 => 24632)
--- branches/mule-3.x-debugger/modules/spring-config/src/main/java/org/mule/config/spring/factories/MessageProcessorChainFactoryBean.java 2012-07-18 16:11:35 UTC (rev 24631)
+++ branches/mule-3.x-debugger/modules/spring-config/src/main/java/org/mule/config/spring/factories/MessageProcessorChainFactoryBean.java 2012-07-18 16:19:39 UTC (rev 24632)
@@ -55,7 +55,7 @@
"MessageProcessorBuilder should only have MessageProcessor's or MessageProcessorBuilder's configured");
}
}
- return MessageProcessors.lifecyleAwareMessageProcessorWrapper(builder.build());
+ return builder.build();
}
public boolean isSingleton()
Modified: branches/mule-3.x-debugger/modules/spring-config/src/main/java/org/mule/config/spring/factories/TransactionalMessageProcessorsFactoryBean.java (24631 => 24632)
--- branches/mule-3.x-debugger/modules/spring-config/src/main/java/org/mule/config/spring/factories/TransactionalMessageProcessorsFactoryBean.java 2012-07-18 16:11:35 UTC (rev 24631)
+++ branches/mule-3.x-debugger/modules/spring-config/src/main/java/org/mule/config/spring/factories/TransactionalMessageProcessorsFactoryBean.java 2012-07-18 16:19:39 UTC (rev 24632)
@@ -70,7 +70,7 @@
"MessageProcessorBuilder should only have MessageProcessor's or MessageProcessorBuilder's configured");
}
}
- return MessageProcessors.lifecyleAwareMessageProcessorWrapper(builder.build());
+ return builder.build();
}
protected TransactionFactory getTransactionFactory()
To unsubscribe from this list please visit:
http://xircles.codehaus.org/manage_email
|