|
A couple of comments:
Can you create some stills class to extract the static methods related to MP paths from AbstractPipeline?
In ChoiceMessagingExceptionStrategy#getMessageProcessorPaths seems to be some duplication of code that is also in the AbstractPipeline static methods.
Pablo On Thu, Sep 27, 2012 at 3:17 PM, <[hidden email]> wrote:
- Revision
- 24919
- Author
- svacas
- Date
- 2012-09-27 13:17:37 -0500 (Thu, 27 Sep 2012)
Log Message
MULE-5751: Allow to programmatically distinguish internal MessageProcessor
delegate building of the path map to the message processor containers
Modified Paths
Diff
Modified: branches/mule-3.x/core/src/main/java/org/mule/api/construct/Pipeline.java (24918 => 24919)
--- branches/mule-3.x/core/src/main/java/org/mule/api/construct/Pipeline.java 2012-09-26 20:05:02 UTC (rev 24918)
+++ branches/mule-3.x/core/src/main/java/org/mule/api/construct/Pipeline.java 2012-09-27 18:17:37 UTC (rev 24919)
@@ -11,6 +11,7 @@
package org.mule.api.construct;
import org.mule.api.processor.MessageProcessor;
+import org.mule.api.processor.MessageProcessorContainer;
import org.mule.api.processor.ProcessingStrategy;
import org.mule.api.source.MessageSource;
@@ -20,7 +21,7 @@
* A pipeline has an ordered list of {@link MessageProcessor}'s that are invoked in order to processor new
* messages received from it's {@link MessageSource}
*/
-public interface Pipeline extends FlowConstruct
+public interface Pipeline extends FlowConstruct, MessageProcessorContainer
{
public void setMessageSource(MessageSource messageSource);
@@ -37,5 +38,4 @@
public String getProcessorPath(MessageProcessor processor);
- public String[] getProcessorPaths();
}
Modified: branches/mule-3.x/core/src/main/java/org/mule/api/processor/MessageProcessorContainer.java (24918 => 24919)
--- branches/mule-3.x/core/src/main/java/org/mule/api/processor/MessageProcessorContainer.java 2012-09-26 20:05:02 UTC (rev 24918)
+++ branches/mule-3.x/core/src/main/java/org/mule/api/processor/MessageProcessorContainer.java 2012-09-27 18:17:37 UTC (rev 24919)
@@ -10,17 +10,18 @@
package org.mule.api.processor;
-import java.util.List;
+import java.util.Map;
/**
* Identifies Constructs that contain Message Processors configured by the user.
*/
public interface MessageProcessorContainer
{
-
/**
- * @return the list of Message Processors configured by the user.
- * Internal Message Processors are omitted.
+ * Generates a map of the child message processors with the message processor
+ * instance as key and the identifier path as value.
+ *
+ * @return Map with the paths of the child message processors
*/
- List<MessageProcessor> getMessageProcessors();
+ Map<MessageProcessor, String> getMessageProcessorPaths();
}
Modified: branches/mule-3.x/core/src/main/java/org/mule/construct/AbstractPipeline.java (24918 => 24919)
--- branches/mule-3.x/core/src/main/java/org/mule/construct/AbstractPipeline.java 2012-09-26 20:05:02 UTC (rev 24918)
+++ branches/mule-3.x/core/src/main/java/org/mule/construct/AbstractPipeline.java 2012-09-27 18:17:37 UTC (rev 24919)
@@ -314,49 +314,74 @@
logger.warn("flow map already populated");
return;
}
- List<MessageProcessor> mps = getMessageProcessors();
- createFlowMap(mps, "/" + getName() + "/processors/");
+ flowMap = getMessageProcessorPaths();
+ }
+
+ @Override
+ public Map<MessageProcessor, String> getMessageProcessorPaths()
+ {
+ Map<MessageProcessor, String> result = new LinkedHashMap<MessageProcessor, String>();
+ int index = 0;
+ String base = "/" + getName();
+ for (MessageProcessor mp : getMessageProcessors())
+ {
+ String prefix = base + "/processors/" + index;
+ result.put(mp, prefix);
+ if (mp instanceof MessageProcessorContainer)
+ {
+ Map<MessageProcessor, String> children = ((MessageProcessorContainer) mp).getMessageProcessorPaths();
+ prefixMessageProcessorPaths(prefix, children);
+ result.putAll(children);
+ }
+ index++;
+ }
if (exceptionListener instanceof MessageProcessorContainer)
{
- mps = ((MessageProcessorContainer) exceptionListener).getMessageProcessors();
- createFlowMap(mps, "/" + getName() + "/es/");
+ Map<MessageProcessor, String> esPathMap = ((MessageProcessorContainer) exceptionListener).getMessageProcessorPaths();
+ prefixMessageProcessorPaths(base + "/es", esPathMap);
+ result.putAll(esPathMap);
}
+ return result;
}
- private void createFlowMap(List<MessageProcessor> processors, String prefix)
+ public static Map<MessageProcessor, String> buildMessageProcessorPaths(List<MessageProcessor> processors)
{
- int idx = 0;
+ Map<MessageProcessor, String> result = new LinkedHashMap<MessageProcessor, String>();
+ int index = 0;
for (MessageProcessor mp : processors)
{
- if (mp == null)
- {
- logger.warn("NULL mp!");
- continue;
- }
- String currentPrefix = prefix + idx;
- flowMap.put(mp, currentPrefix);
+ String prefix = "/" + index;
+ result.put(mp, prefix);
if (mp instanceof MessageProcessorContainer)
{
- createFlowMap(((MessageProcessorContainer) mp).getMessageProcessors(), currentPrefix + "/");
+ Map<MessageProcessor, String> children = ((MessageProcessorContainer) mp).getMessageProcessorPaths();
+ prefixMessageProcessorPaths(prefix, children);
+ result.putAll(children);
}
- idx++;
+ index++;
}
+ return result;
}
- @Override
- public String getProcessorPath(MessageProcessor processor)
+ public static void prefixMessageProcessorPaths(String prefix, Map<MessageProcessor, String> pathMap)
{
- return flowMap.get(processor);
+ if (prefix.endsWith("/"))
+ {
+ prefix = prefix.substring(0, prefix.length() - 1);
+ }
+ for (Map.Entry entry : pathMap.entrySet())
+ {
+ entry.setValue(prefix + entry.getValue());
+ }
}
@Override
- public String[] getProcessorPaths()
+ public String getProcessorPath(MessageProcessor processor)
{
- String[] paths = new String[flowMap.size()];
- paths = flowMap.values().toArray(paths);
- return paths;
+ return flowMap.get(processor);
}
+
public class ProcessIfPipelineStartedMessageProcessor extends AbstractFilteringMessageProcessor
{
Modified: branches/mule-3.x/core/src/main/java/org/mule/exception/ChoiceMessagingExceptionStrategy.java (24918 => 24919)
--- branches/mule-3.x/core/src/main/java/org/mule/exception/ChoiceMessagingExceptionStrategy.java 2012-09-26 20:05:02 UTC (rev 24918)
+++ branches/mule-3.x/core/src/main/java/org/mule/exception/ChoiceMessagingExceptionStrategy.java 2012-09-27 18:17:37 UTC (rev 24919)
@@ -19,12 +19,14 @@
import org.mule.api.processor.MessageProcessor;
import org.mule.api.processor.MessageProcessorContainer;
import org.mule.config.i18n.CoreMessages;
+import org.mule.construct.AbstractPipeline;
import org.mule.message.DefaultExceptionPayload;
import org.mule.processor.AbstractMuleObjectOwner;
-import java.util.ArrayList;
import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
/**
* Selects which exception strategy to execute based on filtering.
@@ -131,16 +133,21 @@
}
@Override
- public List<MessageProcessor> getMessageProcessors()
+ public Map<MessageProcessor, String> getMessageProcessorPaths()
{
- List<MessageProcessor> mps = new ArrayList<MessageProcessor>();
+ Map<MessageProcessor, String> mpPaths = new LinkedHashMap<MessageProcessor, String> ();
+ int idx = 0;
for(MessagingExceptionHandlerAcceptor listener : exceptionListeners)
{
+ String prefix = "/" + idx;
if (listener instanceof MessageProcessorContainer)
{
- mps.addAll(((MessageProcessorContainer) listener).getMessageProcessors());
+ Map<MessageProcessor, String> children = ((MessageProcessorContainer) listener).getMessageProcessorPaths();
+ AbstractPipeline.prefixMessageProcessorPaths(prefix, children);
+ mpPaths.putAll(children);
}
+ idx++;
}
- return mps;
+ return mpPaths;
}
}
Modified: branches/mule-3.x/core/src/main/java/org/mule/processor/AbstractMessageProcessorOwner.java (24918 => 24919)
--- branches/mule-3.x/core/src/main/java/org/mule/processor/AbstractMessageProcessorOwner.java 2012-09-26 20:05:02 UTC (rev 24918)
+++ branches/mule-3.x/core/src/main/java/org/mule/processor/AbstractMessageProcessorOwner.java 2012-09-27 18:17:37 UTC (rev 24919)
@@ -9,19 +9,21 @@
*/
package org.mule.processor;
-import javax.xml.namespace.QName;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
import org.mule.api.AnnotatedObject;
import org.mule.api.construct.FlowConstructAware;
import org.mule.api.context.MuleContextAware;
import org.mule.api.lifecycle.Lifecycle;
import org.mule.api.processor.MessageProcessor;
import org.mule.api.processor.MessageProcessorContainer;
+import org.mule.construct.AbstractPipeline;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.xml.namespace.QName;
+
/**
* An object that owns message processors and delegates startup/shutdown events to them.
*/
@@ -53,9 +55,9 @@
protected abstract List<MessageProcessor> getOwnedMessageProcessors();
@Override
- public List<MessageProcessor> getMessageProcessors()
+ public Map<MessageProcessor, String> getMessageProcessorPaths()
{
- return getOwnedMessageProcessors();
+ return AbstractPipeline.buildMessageProcessorPaths(getOwnedMessageProcessors());
}
}
Modified: branches/mule-3.x/core/src/main/java/org/mule/processor/chain/AbstractMessageProcessorChain.java (24918 => 24919)
--- branches/mule-3.x/core/src/main/java/org/mule/processor/chain/AbstractMessageProcessorChain.java 2012-09-26 20:05:02 UTC (rev 24918)
+++ branches/mule-3.x/core/src/main/java/org/mule/processor/chain/AbstractMessageProcessorChain.java 2012-09-27 18:17:37 UTC (rev 24919)
@@ -25,12 +25,14 @@
import org.mule.api.processor.MessageProcessor;
import org.mule.api.processor.MessageProcessorChain;
import org.mule.api.processor.MessageProcessorContainer;
+import org.mule.construct.AbstractPipeline;
import org.mule.endpoint.EndpointAware;
import org.mule.processor.AbstractInterceptingMessageProcessor;
import org.mule.util.StringUtils;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -182,4 +184,9 @@
return processors;
}
+ @Override
+ public Map<MessageProcessor, String> getMessageProcessorPaths()
+ {
+ return AbstractPipeline.buildMessageProcessorPaths(getMessageProcessors());
+ }
}
Modified: branches/mule-3.x/core/src/main/java/org/mule/routing/AbstractSelectiveRouter.java (24918 => 24919)
--- branches/mule-3.x/core/src/main/java/org/mule/routing/AbstractSelectiveRouter.java 2012-09-26 20:05:02 UTC (rev 24918)
+++ branches/mule-3.x/core/src/main/java/org/mule/routing/AbstractSelectiveRouter.java 2012-09-27 18:17:37 UTC (rev 24919)
@@ -32,6 +32,7 @@
import org.mule.api.routing.SelectiveRouter;
import org.mule.api.routing.filter.Filter;
import org.mule.config.i18n.MessageFactory;
+import org.mule.construct.AbstractPipeline;
import org.mule.management.stats.RouterStatistics;
import java.util.ArrayList;
@@ -365,7 +366,7 @@
}
@Override
- public List<MessageProcessor> getMessageProcessors()
+ public Map<MessageProcessor, String> getMessageProcessorPaths()
{
List<MessageProcessor> messageProcessors = new ArrayList<MessageProcessor>();
for (MessageProcessorFilterPair cmp : conditionalMessageProcessors)
@@ -373,7 +374,7 @@
messageProcessors.add(cmp.getMessageProcessor());
}
messageProcessors.add(defaultProcessor);
- return messageProcessors;
+ return AbstractPipeline.buildMessageProcessorPaths(messageProcessors);
}
@Override
Modified: branches/mule-3.x/core/src/main/java/org/mule/routing/Foreach.java (24918 => 24919)
--- branches/mule-3.x/core/src/main/java/org/mule/routing/Foreach.java 2012-09-26 20:05:02 UTC (rev 24918)
+++ branches/mule-3.x/core/src/main/java/org/mule/routing/Foreach.java 2012-09-27 18:17:37 UTC (rev 24919)
@@ -20,6 +20,7 @@
import org.mule.api.processor.MessageProcessor;
import org.mule.api.transformer.DataType;
import org.mule.api.transformer.TransformerException;
+import org.mule.construct.AbstractPipeline;
import org.mule.expression.ExpressionConfig;
import org.mule.processor.AbstractMessageProcessorOwner;
import org.mule.processor.chain.DefaultMessageProcessorChainBuilder;
@@ -152,10 +153,11 @@
}
@Override
- public List<MessageProcessor> getMessageProcessors()
+ public Map<MessageProcessor, String> getMessageProcessorPaths()
{
//skip the splitter that is added at the beginning
- return getOwnedMessageProcessors().subList(1, getOwnedMessageProcessors().size());
+ List<MessageProcessor> mps = getOwnedMessageProcessors().subList(1, getOwnedMessageProcessors().size());
+ return AbstractPipeline.buildMessageProcessorPaths(mps);
}
@Override
Modified: branches/mule-3.x/tests/integration/src/test/java/org/mule/context/notification/MessageProcessorNotificationPathTestCase.java (24918 => 24919)
--- branches/mule-3.x/tests/integration/src/test/java/org/mule/context/notification/MessageProcessorNotificationPathTestCase.java 2012-09-26 20:05:02 UTC (rev 24918)
+++ branches/mule-3.x/tests/integration/src/test/java/org/mule/context/notification/MessageProcessorNotificationPathTestCase.java 2012-09-27 18:17:37 UTC (rev 24919)
@@ -11,9 +11,11 @@
import org.mule.api.construct.FlowConstruct;
import org.mule.api.construct.Pipeline;
+import org.mule.api.processor.MessageProcessor;
import org.mule.tck.junit4.FunctionalTestCase;
import java.util.LinkedHashSet;
+import java.util.Map;
import java.util.Set;
import org.junit.Assert;
@@ -35,22 +37,22 @@
public void components() throws Exception
{
testFlowPaths("singleMP", "/0");
- testFlowPaths("processorChain", "/0/0", "/0/1");
+ testFlowPaths("processorChain", "/0", "/0/0", "/0/1");
}
@Test
public void routers() throws Exception
{
- testFlowPaths("choice", "/0/0/0", "/0/1/0", "/0/2/0");
- testFlowPaths("all", "/0/0/0", "/0/1/0");
+ testFlowPaths("choice", "/0", "/0/0", "/0/0/0", "/0/1", "/0/1/0", "/0/2", "/0/2/0");
+ testFlowPaths("all", "/0", "/0/0", "/0/0/0", "/0/1", "/0/1/0");
}
@Test
public void scopes() throws Exception
{
- testFlowPaths("foreach", "/0/0");
- testFlowPaths("enricher", "/0/0", "/1/0/0", "/1/0/1");
- testFlowPaths("until-successful", "/0/0/0", "/0/0/1");
+ testFlowPaths("foreach", "/0", "/0/0");
+ testFlowPaths("enricher", "/0", "/0/0", "/1", "/1/0", "/1/0/0", "/1/0/1");
+ testFlowPaths("until-successful", "/0", "/0/0", "/0/0/0", "/0/0/1");
//testFlowPaths("async", "/0/0", "/0/1");
}
@@ -64,34 +66,30 @@
public void exceptionStrategies() throws Exception
{
testFlowPaths("catch-es", "/0", "es/0");
- testFlowPaths("rollback-es", "/0", "es/0");
- testFlowPaths("choice-es", "/0", "es/0", "es/1");
+ testFlowPaths("rollback-es", "/0", "es/0", "es/1");
+ testFlowPaths("choice-es", "/0", "es/0/0", "es/0/1", "es/1/0");
}
- private void testFlowPaths(String flowName, String... leaves) throws Exception
+ private void testFlowPaths(String flowName, String... nodes) throws Exception
{
- String[] expectedPaths = generatePathsFromLeaves(flowName, leaves);
+ String[] expectedPaths = generatePaths(flowName, nodes);
FlowConstruct flow = getFlowConstruct(flowName);
- String[] flowPaths = ((Pipeline) flow).getProcessorPaths();
+ Map<MessageProcessor,String> messageProcessorPaths = ((Pipeline) flow).getMessageProcessorPaths();
+ String[] flowPaths = messageProcessorPaths.values().toArray(new String[]{});
Assert.assertArrayEquals(expectedPaths, flowPaths);
}
- private String[] generatePathsFromLeaves(String flowName, String[] leaves)
+ private String[] generatePaths(String flowName, String[] nodes)
{
Set<String> pathSet = new LinkedHashSet<String>();
String base = "/" + flowName + "/processors";
- for (String leaf : leaves)
+ for (String node : nodes)
{
- if (leaf.startsWith("es/"))
+ if (node.startsWith("es/"))
{
- base = "/" + flowName + "/es";
+ base = "/" + flowName + "/";
}
- String prefix = "/";
- for (String part : leaf.substring(leaf.indexOf("/") + 1).split("/"))
- {
- pathSet.add(base + prefix + part);
- prefix += part + "/";
- }
+ pathSet.add(base + node);
}
return pathSet.toArray(new String[0]);
}
To unsubscribe from this list please visit:
http://xircles.codehaus.org/manage_email
|