|
This has been fix in 24766 and 24767 On Fri, Aug 17, 2012 at 3:18 PM, Pablo Kraan <[hidden email]> wrote:
Comments inlineOn Fri, Aug 17, 2012 at 2:24 PM, Pablo La Greca <[hidden email]> wrote:
Comments below:
On Fri, Aug 17, 2012 at 1:36 PM, Pablo Kraan <[hidden email]> wrote:
Some comments:
Too many magic numbers: return objectStoreManager.getObjectStore(flowConstruct.getName() + "." + getClass().getName(), false, -1, 60 * 5 * 1000, 6000 );
Not part of my changes
This is part of the revision:
+ ObjectStoreManager objectStoreManager = (ObjectStoreManager) muleContext.getRegistry().get(
+ MuleProperties.OBJECT_STORE_MANAGER);
+ return objectStoreManager.getObjectStore(flowConstruct.getName() + "." + getClass().getName(), false, -1, 60 * 5 * 1000, 6000 );
}
Dont use imports with "*"
Will do.
Replace System.setProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8"); with a SystemProperty rule
Will do.
Refactor to remove duplicate code on testMessageRedeliveryUsingMemory and testMessageRedeliveryUsingSerializationStore tests.
Will do.
I think you can replace the InMemoryObjectStore define in the test class with SimpleMemoryObjectStore
I prefer not to make my code depend on SimpleMemoryObjectStore
Don't want my test to rely on something
Pablo On Thu, Aug 16, 2012 at 10:34 PM, <[hidden email]> wrote:
- Revision
- 24756
- Author
- pablo.lagreca
- Date
- 2012-08-16 20:34:27 -0500 (Thu, 16 Aug 2012)
Log Message
EE-2843 - changing redelivery policy to work in a clustered environment
Modified Paths
Added Paths
Diff
Modified: branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java (24755 => 24756)
--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java 2012-08-16 21:58:19 UTC (rev 24755)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java 2012-08-17 01:34:27 UTC (rev 24756)
@@ -11,24 +11,25 @@
import org.mule.api.MuleEvent;
import org.mule.api.MuleException;
+import org.mule.api.config.MuleProperties;
import org.mule.api.lifecycle.Disposable;
import org.mule.api.lifecycle.InitialisationException;
import org.mule.api.lifecycle.Startable;
-import org.mule.api.store.ObjectAlreadyExistsException;
+import org.mule.api.processor.MessageProcessor;
+import org.mule.api.store.ObjectStore;
import org.mule.api.store.ObjectStoreException;
+import org.mule.api.store.ObjectStoreManager;
+import org.mule.api.transformer.TransformerException;
import org.mule.config.i18n.CoreMessages;
import org.mule.transformer.simple.ByteArrayToHexString;
import org.mule.transformer.simple.ObjectToByteArray;
-import org.mule.transformer.simple.SerializableToByteArray;
-import org.mule.util.store.AbstractMonitoredObjectStore;
-import org.mule.util.store.InMemoryObjectStore;
-
import java.io.InputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.atomic.AtomicInteger;
+import org.mule.util.store.ObjectStorePartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,7 +49,7 @@
private boolean useSecureHash;
private String messageDigestAlgorithm;
private String idExpression;
- private AbstractMonitoredObjectStore<AtomicInteger> store;
+ private ObjectStore<AtomicInteger> store;
@Override
public void initialise() throws InitialisationException
@@ -56,10 +57,11 @@
super.initialise();
if (useSecureHash && idExpression != null)
{
- throw new InitialisationException(
- CoreMessages.initialisationFailure(String.format(
- "The Id expression'%s' was specified when a secure hash will be used",
- idExpression)), this);
+ useSecureHash = false;
+ if (logger.isWarnEnabled())
+ {
+ logger.warn("Disabling useSecureHash in idempotent-redelivery-policy since an idExpression has been configured");
+ }
}
if (!useSecureHash && messageDigestAlgorithm != null)
{
@@ -96,15 +98,11 @@
store = createStore();
}
- private AbstractMonitoredObjectStore<AtomicInteger> createStore() throws InitialisationException
+ private ObjectStore<AtomicInteger> createStore() throws InitialisationException
{
- AbstractMonitoredObjectStore s = new InMemoryObjectStore<AtomicInteger>();
- s.setName(flowConstruct.getName() + "." + getClass().getName());
- s.setMaxEntries(-1);
- s.setEntryTTL(60 * 5 * 1000);
- s.setExpirationInterval(6000);
- s.initialise();
- return s;
+ ObjectStoreManager objectStoreManager = (ObjectStoreManager) muleContext.getRegistry().get(
+ MuleProperties.OBJECT_STORE_MANAGER);
+ return objectStoreManager.getObjectStore(flowConstruct.getName() + "." + getClass().getName(), false, -1, 60 * 5 * 1000, 6000 );
}
@@ -115,7 +113,17 @@
if (store != null)
{
- store.dispose();
+ if (store instanceof ObjectStorePartition)
+ {
+ try
+ {
+ ((ObjectStorePartition)store).close();
+ }
+ catch (ObjectStoreException e)
+ {
+ logger.warn("error closing object store: " + e.getMessage(), e);
+ }
+ }
store = null;
}
@@ -147,6 +155,11 @@
{
messageId = getIdForEvent(event);
}
+ catch (TransformerException e)
+ {
+ logger.warn("The message cannot be processed because the digest could not be generated. Either make the payload serializable or use an expression.");
+ return null;
+ }
catch (Exception ex)
{
exceptionSeen = true;
@@ -154,7 +167,7 @@
if (!exceptionSeen)
{
- counter = getCounter(messageId, null, false);
+ counter = findCounter(messageId);
tooMany = counter != null && counter.get() > maxRedeliveryCount;
}
@@ -174,7 +187,7 @@
try
{
MuleEvent returnEvent = processNext(event);
- counter = getCounter(messageId, counter, false);
+ counter = findCounter(messageId);
if (counter != null)
{
counter.set(0);
@@ -183,51 +196,42 @@
}
catch (MuleException ex)
{
- incrementCounter(messageId, counter);
+ incrementCounter(messageId);
throw ex;
}
catch (RuntimeException ex)
{
- incrementCounter(messageId, counter);
+ incrementCounter(messageId);
throw ex;
}
}
-
-
- private AtomicInteger incrementCounter(String messageId, AtomicInteger counter) throws ObjectStoreException
+
+ public AtomicInteger findCounter(String messageId) throws ObjectStoreException
{
- counter = getCounter(messageId, counter, true);
- counter.incrementAndGet();
- return counter;
+ boolean counterExists = store.contains(messageId);
+ if (counterExists)
+ {
+ return store.retrieve(messageId);
+ }
+ return null;
}
- private AtomicInteger getCounter(String messageId, AtomicInteger counter, boolean create) throws ObjectStoreException
+ private AtomicInteger incrementCounter(String messageId) throws ObjectStoreException
{
- if (counter != null)
+ AtomicInteger counter = findCounter(messageId);
+ if (counter == null)
{
- return counter;
+ counter = new AtomicInteger();
}
- boolean counterExists = store.contains(messageId);
- if (counterExists)
+ else
{
- return store.retrieve(messageId);
+ store.remove(messageId);
}
- if (create)
- {
- try
- {
- counter = new AtomicInteger();
- store.store(messageId, counter);
- }
- catch (ObjectAlreadyExistsException e)
- {
- counter = store.retrieve(messageId);
- }
- }
+ counter.incrementAndGet();
+ store.store(messageId,counter);
return counter;
}
-
private String getIdForEvent(MuleEvent event) throws Exception
{
if (useSecureHash)
@@ -278,4 +282,10 @@
{
this.idExpression = idExpression;
}
+
+ public void setMessageProcessor(MessageProcessor processor)
+ {
+ this.deadLetterQueue = processor;
+ }
}
+
Added: branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java (0 => 24756)
--- branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java 2012-08-17 01:34:27 UTC (rev 24756)
@@ -0,0 +1,219 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.processor;
+
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.Before;
+import org.mockito.Answers;
+import org.mockito.Mockito;
+import org.mockito.internal.verification.VerificationModeFactory;
+import org.mule.api.*;
+import org.mule.api.config.MuleProperties;
+import org.mule.api.construct.FlowConstruct;
+import org.mule.api.processor.MessageProcessor;
+import org.mule.api.store.ObjectStore;
+import org.mule.api.store.ObjectStoreException;
+import org.mule.api.store.ObjectStoreManager;
+import org.mule.routing.MessageProcessorFilterPair;
+import org.mule.tck.junit4.AbstractMuleTestCase;
+
+import org.junit.Test;
+
+import junit.framework.Assert;
+import org.mule.util.SerializationUtils;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class IdempotentRedeliveryPolicyTestCase extends AbstractMuleTestCase
+{
+
+ public static final String STRING_MESSAGE = "message";
+ public static final int MAX_REDELIVERY_COUNT = 1;
+ private MuleContext mockMuleContext = mock(MuleContext.class, Answers.RETURNS_DEEP_STUBS.get());
+ private ObjectStoreManager mockObjectStoreManager = mock(ObjectStoreManager.class, Answers.RETURNS_DEEP_STUBS.get());
+ private MessageProcessor mockFailingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
+ private MessageProcessorFilterPair mockDlqMessageProcessor = mock(MessageProcessorFilterPair.class, Answers.RETURNS_DEEP_STUBS.get());
+ private MuleMessage message = mock(MuleMessage.class, Answers.RETURNS_DEEP_STUBS.get());
+ private MuleEvent event = mock(MuleEvent.class, Answers.RETURNS_DEEP_STUBS.get());
+
+ @Before
+ public void setUpTest() throws MuleException
+ {
+ when(mockFailingMessageProcessor.process(any(MuleEvent.class))).thenThrow(new RuntimeException("failing"));
+ System.setProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+ }
+
+ @Test
+ public void messageDigestFailure() throws Exception
+ {
+ Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
+ Mockito.when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(new InMemoryObjectStore());
+
+ IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
+ irp.setUseSecureHash(true);
+ irp.setMaxRedeliveryCount(1);
+ irp.setFlowConstruct(mock(FlowConstruct.class));
+ irp.setMuleContext(mockMuleContext);
+ irp.initialise();
+
+
+ when(message.getPayload()).thenReturn(new Object());
+
+ when(event.getMessage()).thenReturn(message);
+ MuleEvent process = irp.process(event);
+ Assert.assertNull(process);
+ }
+
+ @Test
+ public void testMessageRedeliveryUsingMemory() throws Exception
+ {
+ Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
+ Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new InMemoryObjectStore());
+
+ IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
+ irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
+ irp.setUseSecureHash(true);
+ irp.setFlowConstruct(mock(FlowConstruct.class));
+ irp.setMuleContext(mockMuleContext);
+ irp.setListener(mockFailingMessageProcessor);
+ irp.setDeadLetterQueue(mockDlqMessageProcessor);
+ irp.initialise();
+
+ when(message.getPayload()).thenReturn(STRING_MESSAGE);
+ when(event.getMessage()).thenReturn(message);
+
+ for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)
+ {
+ try
+ {
+ irp.process(event);
+ }
+ catch (Exception e)
+ {
+ }
+ }
+ verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1));
+ }
+
+ @Test
+ public void testMessageRedeliveryUsingSerializationStore() throws Exception
+ {
+ Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
+ Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new SerializationObjectStore());
+
+ IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
+ irp.setUseSecureHash(true);
+ irp.setMaxRedeliveryCount(1);
+ irp.setFlowConstruct(mock(FlowConstruct.class));
+ irp.setMuleContext(mockMuleContext);
+ irp.setListener(mockFailingMessageProcessor);
+ irp.setDeadLetterQueue(mockDlqMessageProcessor);
+ irp.initialise();
+
+ when(message.getPayload()).thenReturn(STRING_MESSAGE);
+ when(event.getMessage()).thenReturn(message);
+
+ for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)
+ {
+ try
+ {
+ irp.process(event);
+ }
+ catch (Exception e)
+ {
+ }
+ }
+ verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1));
+ }
+
+ public static class SerializationObjectStore implements ObjectStore<AtomicInteger>
+ {
+
+ private Map<Serializable,Serializable> store = new HashMap<Serializable,Serializable>();
+
+ @Override
+ public boolean contains(Serializable key) throws ObjectStoreException
+ {
+ return store.containsKey(key);
+ }
+
+ @Override
+ public void store(Serializable key, AtomicInteger value) throws ObjectStoreException
+ {
+ store.put(key, SerializationUtils.serialize(value));
+ }
+
+ @Override
+ public AtomicInteger retrieve(Serializable key) throws ObjectStoreException
+ {
+ Serializable serializable = store.get(key);
+ return (AtomicInteger) SerializationUtils.deserialize((byte[]) serializable);
+ }
+
+ @Override
+ public AtomicInteger remove(Serializable key) throws ObjectStoreException
+ {
+ Serializable serializable = store.remove(key);
+ return (AtomicInteger) SerializationUtils.deserialize((byte[]) serializable);
+ }
+
+ @Override
+ public boolean isPersistent()
+ {
+ return false;
+ }
+ }
+
+ public static class InMemoryObjectStore implements ObjectStore<AtomicInteger>
+ {
+ private Map<Serializable,AtomicInteger> store = new HashMap<Serializable,AtomicInteger>();
+
+ @Override
+ public boolean contains(Serializable key) throws ObjectStoreException
+ {
+ return store.containsKey(key);
+ }
+
+ @Override
+ public void store(Serializable key, AtomicInteger value) throws ObjectStoreException
+ {
+ store.put(key,value);
+ }
+
+ @Override
+ public AtomicInteger retrieve(Serializable key) throws ObjectStoreException
+ {
+ return store.get(key);
+ }
+
+ @Override
+ public AtomicInteger remove(Serializable key) throws ObjectStoreException
+ {
+ return store.remove(key);
+ }
+
+ @Override
+ public boolean isPersistent()
+ {
+ return false;
+ }
+ }
+
+}
+
+
Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java
___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
To unsubscribe from this list please visit:
http://xircles.codehaus.org/manage_email
|