Re: [mule-scm] [mule][24786] branches/mule-3.2.x: MULE-6403 - adding basic locking mechanism and applying it to IdempotentRedeliveryPolicy

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: [mule-scm] [mule][24786] branches/mule-3.2.x: MULE-6403 - adding basic locking mechanism and applying it to IdempotentRedeliveryPolicy

Daniel Feist
You currently need to add everything you add to default-mule-config.xml to org.mule.config.builders.DefaultsConfigurationBuilder also.

Dan

On Aug 21, 2012, at 2:18 AM, [hidden email] wrote:

[mule][24786] branches/mule-3.2.x: MULE-6403 - adding basic locking mechanism and applying it to IdempotentRedeliveryPolicy
Revision
24786
Author
pablo.lagreca
Date
2012-08-21 00:18:14 -0500 (Tue, 21 Aug 2012)

Log Message

MULE-6403 - adding basic locking mechanism and applying it to IdempotentRedeliveryPolicy

Modified Paths

  • <a href="x-msg://4/#branchesmule32xcoresrcmainjavaorgmuleapiconfigMulePropertiesjava">branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java
  • <a href="x-msg://4/#branchesmule32xcoresrcmainjavaorgmuleprocessorAbstractRedeliveryPolicyjava">branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java
  • <a href="x-msg://4/#branchesmule32xcoresrcmainjavaorgmuleprocessorIdempotentRedeliveryPolicyjava">branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java
  • <a href="x-msg://4/#branchesmule32xcoresrctestjavaorgmuleprocessorIdempotentRedeliveryPolicyTestCasejava">branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java
  • <a href="x-msg://4/#branchesmule32xmodulesspringconfigsrcmainresourcesdefaultmuleconfigxml">branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml

Added Paths

  • branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/
  • <a href="x-msg://4/#branchesmule32xcoresrcmainjavaorgmuleutillockLockjava">branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java
  • <a href="x-msg://4/#branchesmule32xcoresrcmainjavaorgmuleutillockLockFactoryjava">branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java
  • <a href="x-msg://4/#branchesmule32xcoresrcmainjavaorgmuleutillockServerLockjava">branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java
  • <a href="x-msg://4/#branchesmule32xcoresrcmainjavaorgmuleutillockServerLockFactoryjava">branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java
  • <a href="x-msg://4/#branchesmule32xcoresrctestjavaorgmuletckjunit4ruleSystemPropertyjava">branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java
  • branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/
  • <a href="x-msg://4/#branchesmule32xcoresrctestjavaorgmuleutillockServerLockTestCasejava">branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java

Diff

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java (24785 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java	2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -148,6 +148,7 @@
     public static final String OBJECT_DEFAULT_RETRY_POLICY_TEMPLATE = "_defaultRetryPolicyTemplate";
     public static final String OBJECT_MULE_CONFIGURATION = "_muleConfiguration";
     public static final String OBJECT_MULE_NAMESPACE_MANAGER = "_muleNamespaceManager";
+    public static final String OBJECT_LOCK_FACTORY = "_muleLockFactory";
 
     // Not currently used as these need to be instance variables of the MuleContext.
     public static final String OBJECT_WORK_MANAGER = "_muleWorkManager";

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java (24785 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java	2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -49,7 +49,7 @@
     @Override
     public void initialise() throws InitialisationException
     {
-        if (maxRedeliveryCount < 1)
+        if (maxRedeliveryCount < 0)
         {
             throw new InitialisationException(
                 CoreMessages.initialisationFailure(

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java (24785 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java	2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -25,10 +25,13 @@
 import org.mule.transformer.simple.ObjectToByteArray;
 
 import java.io.InputStream;
+import java.io.Serializable;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.mule.util.lock.Lock;
+import org.mule.util.lock.LockFactory;
 import org.mule.util.store.ObjectStorePartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,6 +53,7 @@
     private String messageDigestAlgorithm;
     private String idExpression;
     private ObjectStore<AtomicInteger> store;
+    private Lock<Serializable> lock;
 
     @Override
     public void initialise() throws InitialisationException
@@ -95,6 +99,11 @@
             }
         }
 
+        String appName = muleContext.getConfiguration().getId();
+        String flowName = flowConstruct.getName();
+        String idrId = String.format("%s-%s-%s",appName,flowName,"idr");
+        lock = ((LockFactory<Serializable>)muleContext.getRegistry().get(MuleProperties.OBJECT_LOCK_FACTORY)).createLock(idrId);
+
         store = createStore();
     }
 
@@ -165,45 +174,55 @@
             exceptionSeen = true;
         }
 
-        if (!exceptionSeen)
+        lock.lock(messageId);
+        try
         {
-            counter = findCounter(messageId);
-            tooMany = counter != null && counter.get() > maxRedeliveryCount;
-        }
 
-        if (tooMany || exceptionSeen)
-        {
-            try
+            if (!exceptionSeen)
             {
-                return deadLetterQueue.process(event);
+                counter = findCounter(messageId);
+                tooMany = counter != null && counter.get() > maxRedeliveryCount;
             }
-            catch (Exception ex)
+
+            if (tooMany || exceptionSeen)
             {
-                logger.info("Exception thrown from failed message processing for message " + messageId, ex);
+                try
+                {
+                    return deadLetterQueue.process(event);
+                }
+                catch (Exception ex)
+                {
+                    logger.info("Exception thrown from failed message processing for message " + messageId, ex);
+                }
+                return null;
             }
-            return null;
-        }
 
-        try
-        {
-            MuleEvent returnEvent = processNext(event);
-            counter = findCounter(messageId);
-            if (counter != null)
+            try
             {
-                resetCounter(messageId);
+                MuleEvent returnEvent = processNext(event);
+                counter = findCounter(messageId);
+                if (counter != null)
+                {
+                    resetCounter(messageId);
+                }
+                return returnEvent;
             }
-            return returnEvent;
+            catch (MuleException ex)
+            {
+                incrementCounter(messageId);
+                throw ex;
+            }
+            catch (RuntimeException ex)
+            {
+                incrementCounter(messageId);
+                throw ex;
+            }
         }
-        catch (MuleException ex)
+        finally
         {
-            incrementCounter(messageId);
-            throw ex;
+            lock.unlock(messageId);
         }
-        catch (RuntimeException ex)
-        {
-            incrementCounter(messageId);
-            throw ex;
-        }
+
     }
 
     private void resetCounter(String messageId) throws ObjectStoreException

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java (0 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,30 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import org.mule.api.lifecycle.Disposable;
+
+/**
+ * Interface to provide a locking mechanism to use in mule components
+ */
+public interface Lock<T> extends Disposable
+{
+
+    /*
+     * Gets a lock over the resource identified with lockId
+     */
+    void lock(T lockId);
+
+    /*
+     * Releases lock over the resource identified with lockId
+     */
+    void unlock(T lockId);
+    
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java (0 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,25 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+/**
+ * Factory for creating Lock instances.
+ *
+ * Default LockFactory can be override by modules using registry-bootstrap.
+ */
+public interface LockFactory<T>
+{
+
+    /**
+     * Creates a Lock for a given resource using the resource unique identifier.
+     */
+    Lock<T> createLock(String lockResourceName);
+    
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java (0 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,104 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Default implementation of the Lock interface. Useful for doing locking in a single mule instance.
+ */
+public class ServerLock<T> implements Lock<T>
+{
+    private Map<T, LockEntry> locks;
+    private Object acquireLock = new Object();
+
+    public ServerLock()
+    {
+        this.locks = new HashMap<T,LockEntry>();
+    }
+
+    public void lock(T key)
+    {
+        LockEntry lock;
+        synchronized (acquireLock)
+        {
+            if (this.locks.containsKey(key))
+            {
+                lock = this.locks.get(key);
+            }
+            else
+            {
+                lock = new LockEntry();
+                this.locks.put(key,lock);
+            }
+            lock.incrementLockCount();
+            acquireLock.notifyAll();
+        }
+        lock.lock();
+    }
+
+    public void unlock(T key)
+    {
+        synchronized (acquireLock)
+        {
+            LockEntry lock = this.locks.get(key);
+            if (lock != null)
+            {
+                lock.decrementLockCount();
+                if (!lock.hasPendingLocks())
+                {
+                    this.locks.remove(key);
+                }
+                lock.unlock();
+            }
+            acquireLock.notifyAll();
+        }
+    }
+
+    public static class LockEntry
+    {
+        private AtomicInteger lockCount  = new AtomicInteger(0);
+        private ReentrantLock lock = new ReentrantLock(true);
+
+        public void lock()
+        {
+            lock.lock();
+        }
+
+        public void incrementLockCount()
+        {
+            lockCount.incrementAndGet();
+        }
+
+        public void decrementLockCount()
+        {
+            lockCount.decrementAndGet();
+        }
+
+        public void unlock()
+        {
+            lock.unlock();
+        }
+
+        public boolean hasPendingLocks()
+        {
+            return lockCount.get() > 0;
+        }
+    }
+
+    @Override
+    public void dispose()
+    {
+        locks.clear();
+    }
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java (0 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,19 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+public class ServerLockFactory<T> implements LockFactory<T>
+{
+    @Override
+    public Lock<T> createLock(String lockResourceName)
+    {
+        return new ServerLock<T>();
+    }
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Modified: branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java (24785 => 24786)


--- branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java	2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -10,71 +10,97 @@
 
 package org.mule.processor;
 
-import static org.mockito.Matchers.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-
+import junit.framework.Assert;
 import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
 import org.mockito.Answers;
-import org.mockito.Mockito;
 import org.mockito.internal.verification.VerificationModeFactory;
-import org.mule.api.*;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mule.api.MuleContext;
+import org.mule.api.MuleEvent;
+import org.mule.api.MuleException;
+import org.mule.api.MuleMessage;
 import org.mule.api.config.MuleProperties;
 import org.mule.api.construct.FlowConstruct;
 import org.mule.api.processor.MessageProcessor;
 import org.mule.api.store.ObjectStore;
 import org.mule.api.store.ObjectStoreException;
 import org.mule.api.store.ObjectStoreManager;
-import org.mule.routing.MessageProcessorFilterPair;
 import org.mule.tck.junit4.AbstractMuleTestCase;
-
-import org.junit.Test;
-
-import junit.framework.Assert;
+import org.mule.tck.junit4.rule.SystemProperty;
 import org.mule.util.SerializationUtils;
+import org.mule.util.concurrent.Latch;
+import org.mule.util.lock.ServerLock;
+import org.mule.util.lock.ServerLockFactory;
 
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class IdempotentRedeliveryPolicyTestCase extends AbstractMuleTestCase
 {
 
     public static final String STRING_MESSAGE = "message";
-    public static final int MAX_REDELIVERY_COUNT = 1;
+    public static final int MAX_REDELIVERY_COUNT = 0;
     private MuleContext mockMuleContext = mock(MuleContext.class, Answers.RETURNS_DEEP_STUBS.get());
     private ObjectStoreManager mockObjectStoreManager = mock(ObjectStoreManager.class, Answers.RETURNS_DEEP_STUBS.get());
     private MessageProcessor mockFailingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
-    private MessageProcessorFilterPair mockDlqMessageProcessor = mock(MessageProcessorFilterPair.class, Answers.RETURNS_DEEP_STUBS.get());
+    private MessageProcessor mockWaitingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
+    private MessageProcessor mockDlqMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
     private MuleMessage message = mock(MuleMessage.class, Answers.RETURNS_DEEP_STUBS.get());
     private MuleEvent event = mock(MuleEvent.class, Answers.RETURNS_DEEP_STUBS.get());
+    private Latch waitLatch = new Latch();
+    private CountDownLatch waitingMessageProcessorExecutionLatch = new CountDownLatch(2);
+    private final IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
 
+    @Rule
+    public SystemProperty systemProperty = new SystemProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+
     @Before
     public void setUpTest() throws MuleException
     {
         when(mockFailingMessageProcessor.process(any(MuleEvent.class))).thenThrow(new RuntimeException("failing"));
-        System.setProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+        when(mockWaitingMessageProcessor.process(event)).thenAnswer(new Answer<MuleEvent>()
+        {
+            @Override
+            public MuleEvent answer(InvocationOnMock invocationOnMock) throws Throwable
+            {
+                waitingMessageProcessorExecutionLatch.countDown();
+                waitLatch.await(2000, TimeUnit.MILLISECONDS);
+                return mockFailingMessageProcessor.process((MuleEvent) invocationOnMock.getArguments()[0]);
+            }
+        });
+        when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_LOCK_FACTORY)).thenReturn(new ServerLockFactory());
+        when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
+        InMemoryObjectStore inMemoryObjectStore = new InMemoryObjectStore();
+        when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(inMemoryObjectStore);
+        when(event.getMessage()).thenReturn(message);
+        irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
+        irp.setUseSecureHash(true);
+        irp.setFlowConstruct(mock(FlowConstruct.class));
+        irp.setMuleContext(mockMuleContext);
+        irp.setListener(mockFailingMessageProcessor);
+        irp.setMessageProcessor(mockDlqMessageProcessor);
+
     }
 
     @Test
     public void messageDigestFailure() throws Exception
     {
-        Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
-        Mockito.when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(new InMemoryObjectStore());
-
-        IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
-        irp.setUseSecureHash(true);
-        irp.setMaxRedeliveryCount(1);
-        irp.setFlowConstruct(mock(FlowConstruct.class));
-        irp.setMuleContext(mockMuleContext);
+        when(message.getPayload()).thenReturn(new Object());
         irp.initialise();
-
-
-        when(message.getPayload()).thenReturn(new Object());
-
-        when(event.getMessage()).thenReturn(message);
         MuleEvent process = irp.process(event);
         Assert.assertNull(process);
     }
@@ -82,22 +108,42 @@
     @Test
     public void testMessageRedeliveryUsingMemory() throws Exception
     {
-        Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
-        Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new InMemoryObjectStore());
+        when(message.getPayload()).thenReturn(STRING_MESSAGE);
+        irp.initialise();
+        processUntilFailure();
+        verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+    }
 
-        IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
-        irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
-        irp.setUseSecureHash(true);
-        irp.setFlowConstruct(mock(FlowConstruct.class));
-        irp.setMuleContext(mockMuleContext);
-        irp.setListener(mockFailingMessageProcessor);
-        irp.setDeadLetterQueue(mockDlqMessageProcessor);
+    @Test
+    public void testMessageRedeliveryUsingSerializationStore() throws Exception
+    {
+        when(message.getPayload()).thenReturn(STRING_MESSAGE);
         irp.initialise();
+        processUntilFailure();
+        verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+    }
 
+    @Test
+    public void testThreadSafeObjectStoreUsage() throws Exception
+    {
         when(message.getPayload()).thenReturn(STRING_MESSAGE);
-        when(event.getMessage()).thenReturn(message);
+        irp.setListener(mockWaitingMessageProcessor);
+        irp.initialise();
 
-        for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)
+        ExecuteIrpThread firstIrpExecutionThread = new ExecuteIrpThread();
+        firstIrpExecutionThread.start();
+        ExecuteIrpThread threadCausingRedeliveryException = new ExecuteIrpThread();
+        threadCausingRedeliveryException.start();
+        waitingMessageProcessorExecutionLatch.await(5000, TimeUnit.MILLISECONDS);
+        waitLatch.release();
+        firstIrpExecutionThread.join();
+        threadCausingRedeliveryException.join();
+        verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+    }
+
+    private void processUntilFailure()
+    {
+        for (int i = 0; i < MAX_REDELIVERY_COUNT + 2; i++)
         {
             try
             {
@@ -107,28 +153,14 @@
             {
             }
         }
-        verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1));
     }
 
-    @Test
-    public void testMessageRedeliveryUsingSerializationStore() throws Exception
+    public class ExecuteIrpThread extends Thread
     {
-        Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
-        Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new SerializationObjectStore());
-
-        IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
-        irp.setUseSecureHash(true);
-        irp.setMaxRedeliveryCount(1);
-        irp.setFlowConstruct(mock(FlowConstruct.class));
-        irp.setMuleContext(mockMuleContext);
-        irp.setListener(mockFailingMessageProcessor);
-        irp.setDeadLetterQueue(mockDlqMessageProcessor);
-        irp.initialise();
-
-        when(message.getPayload()).thenReturn(STRING_MESSAGE);
-        when(event.getMessage()).thenReturn(message);
-
-        for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)
+        public Exception exception;
+        
+        @Override
+        public void run()
         {
             try
             {
@@ -136,15 +168,18 @@
             }
             catch (Exception e)
             {
+                exception = e;
             }
         }
-        verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1));
     }
+   
+    
 
     public static class SerializationObjectStore implements ObjectStore<AtomicInteger>
     {
 
         private Map<Serializable,Serializable> store = new HashMap<Serializable,Serializable>();
+        private ServerLock lockableObjectStore = new ServerLock();
 
         @Override
         public boolean contains(Serializable key) throws ObjectStoreException
@@ -177,11 +212,13 @@
         {
             return false;
         }
+
     }
 
     public static class InMemoryObjectStore implements ObjectStore<AtomicInteger>
     {
         private Map<Serializable,AtomicInteger> store = new HashMap<Serializable,AtomicInteger>();
+        private ServerLock lockableObjectStore = new ServerLock();
 
         @Override
         public boolean contains(Serializable key) throws ObjectStoreException
@@ -212,6 +249,7 @@
         {
             return false;
         }
+
     }
 
 }

Added: branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java (0 => 24786)


--- branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,90 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.tck.junit4.rule;
+
+import org.junit.rules.ExternalResource;
+
+/**
+ * Sets up a system property before a test and guaranties to tear it down
+ * afterward.
+ */
+public class SystemProperty extends ExternalResource
+{
+
+    private final String name;
+    private String value;
+    private boolean initialized;
+    private String oldValue;
+
+    public SystemProperty(String name)
+    {
+        this(name, null);
+    }
+
+    public SystemProperty(String name, String value)
+    {
+        this.name = name;
+        this.value = value;
+    }
+
+    @Override
+    protected void before() throws Throwable
+    {
+        if (initialized)
+        {
+            throw new IllegalArgumentException("System property was already initialized");
+        }
+
+        oldValue = System.setProperty(name, getValue());
+        initialized = true;
+    }
+
+    @Override
+    protected void after()
+    {
+        if (!initialized)
+        {
+            throw new IllegalArgumentException("System property was not initialized");
+        }
+
+        doCleanUp();
+        restoreOldValue();
+
+        initialized = false;
+    }
+
+    protected void restoreOldValue()
+    {
+        if (oldValue == null)
+        {
+            System.clearProperty(name);
+        }
+        else
+        {
+            System.setProperty(name, oldValue);
+        }
+    }
+
+    public String getName()
+    {
+        return name;
+    }
+
+    protected void doCleanUp()
+    {
+        // Nothing to do
+    };
+
+    public String getValue()
+    {
+        return value;
+    };
+}
Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java (0 => 24786)


--- branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,151 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import org.junit.Test;
+import org.mule.api.store.ObjectAlreadyExistsException;
+import org.mule.api.store.ObjectStore;
+import org.mule.api.store.ObjectStoreException;
+import org.mule.config.i18n.CoreMessages;
+import org.mule.tck.junit4.AbstractMuleTestCase;
+import org.mule.util.concurrent.Latch;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class ServerLockTestCase extends AbstractMuleTestCase
+{
+    public static final int THREAD_COUNT = 100;
+    public static final int ITERATIONS_PER_THREAD = 100;
+    private Latch threadStartLatch = new Latch();
+    private String sharedKeyA = "A";
+    private String sharedKeyB = "B";
+    private ServerLock<String> serverLock = new ServerLock<String>();
+    private InMemoryObjectStore objectStore  = new InMemoryObjectStore();
+
+    @Test
+    public void testHighConcurrency() throws Exception
+    {
+        List<Thread> threads = new ArrayList<Thread>(THREAD_COUNT);
+        for (int i = 0; i < THREAD_COUNT; i++)
+        {
+            IncrementKeyValueThread incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyA);
+            threads.add(incrementKeyValueThread);
+            incrementKeyValueThread.start();
+            incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyB);
+            threads.add(incrementKeyValueThread);
+            incrementKeyValueThread.start();
+        }
+        threadStartLatch.release();
+        for (Thread thread : threads)
+        {
+            thread.join();
+        }
+        assertThat(objectStore.retrieve(sharedKeyA), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+        assertThat(objectStore.retrieve(sharedKeyB), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+    }
+    
+    public class IncrementKeyValueThread extends Thread
+    {
+        private String key;
+
+        public IncrementKeyValueThread(String key)
+        {
+            super("Thread-" + key);
+            this.key = key;
+        }
+
+        @Override
+        public void run()
+        {
+            try
+            {
+                threadStartLatch.await(5000, TimeUnit.MILLISECONDS);
+                for (int i = 0; i < ITERATIONS_PER_THREAD; i ++)
+                {
+                    if (Thread.interrupted())
+                    {
+                        break;
+                    }
+                    serverLock.lock(key);
+                    try
+                    {
+                        Integer value;
+                        if (objectStore.contains(key))
+                        {
+                            value = objectStore.retrieve(key);
+                            objectStore.remove(key);
+                        }
+                        else
+                        {
+                            value = 0;
+                        }
+                        objectStore.store(key,value + 1);
+                    }
+                    finally
+                    {
+                        serverLock.unlock(key);
+                    }
+                }
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public static class InMemoryObjectStore implements ObjectStore<Integer>
+    {
+        private Map<Serializable,Integer> store = new HashMap<Serializable,Integer>();
+
+        @Override
+        public boolean contains(Serializable key) throws ObjectStoreException
+        {
+            return store.containsKey(key);
+        }
+
+        @Override
+        public void store(Serializable key, Integer value) throws ObjectStoreException
+        {
+            if (store.containsKey(key))
+            {
+                throw new ObjectAlreadyExistsException(CoreMessages.createStaticMessage(""));
+            }
+            store.put(key,value);
+        }
+
+        @Override
+        public Integer retrieve(Serializable key) throws ObjectStoreException
+        {
+            return store.get(key);
+        }
+
+        @Override
+        public Integer remove(Serializable key) throws ObjectStoreException
+        {
+            return store.remove(key);
+        }
+
+        @Override
+        public boolean isPersistent()
+        {
+            return false;
+        }
+    }
+
+}
Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Modified: branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml (24785 => 24786)


--- branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml	2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml	2012-08-21 05:18:14 UTC (rev 24786)
@@ -86,6 +86,8 @@
 
     <bean name="_defaultRetryPolicyTemplate" class="org.mule.retry.policies.NoRetryPolicyTemplate"/>
 
+    <bean name="_muleLockFactory" class="org.mule.util.lock.ServerLockFactory"/>
+
     <!-- Default Transformers are now loaded from META-INF/services/org/mule/config/registry-bootstrap.properties so that
     the transformers will be available even when using the TransientRegistry only -->
 

To unsubscribe from this list please visit:

http://xircles.codehaus.org/manage_email


Reply | Threaded
Open this post in threaded view
|

Re: [mule-scm] [mule][24786] branches/mule-3.2.x: MULE-6403 - adding basic locking mechanism and applying it to IdempotentRedeliveryPolicy

Daniel Feist
Does IdempotentMessageFilter need the same?

Dan

On Aug 21, 2012, at 2:18 AM, [hidden email] wrote:

[mule][24786] branches/mule-3.2.x: MULE-6403 - adding basic locking mechanism and applying it to IdempotentRedeliveryPolicy
Revision
24786
Author
pablo.lagreca
Date
2012-08-21 00:18:14 -0500 (Tue, 21 Aug 2012)

Log Message

MULE-6403 - adding basic locking mechanism and applying it to IdempotentRedeliveryPolicy

Modified Paths

  • <a href="x-msg://8/#branchesmule32xcoresrcmainjavaorgmuleapiconfigMulePropertiesjava">branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java
  • <a href="x-msg://8/#branchesmule32xcoresrcmainjavaorgmuleprocessorAbstractRedeliveryPolicyjava">branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java
  • <a href="x-msg://8/#branchesmule32xcoresrcmainjavaorgmuleprocessorIdempotentRedeliveryPolicyjava">branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java
  • <a href="x-msg://8/#branchesmule32xcoresrctestjavaorgmuleprocessorIdempotentRedeliveryPolicyTestCasejava">branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java
  • <a href="x-msg://8/#branchesmule32xmodulesspringconfigsrcmainresourcesdefaultmuleconfigxml">branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml

Added Paths

  • branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/
  • <a href="x-msg://8/#branchesmule32xcoresrcmainjavaorgmuleutillockLockjava">branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java
  • <a href="x-msg://8/#branchesmule32xcoresrcmainjavaorgmuleutillockLockFactoryjava">branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java
  • <a href="x-msg://8/#branchesmule32xcoresrcmainjavaorgmuleutillockServerLockjava">branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java
  • <a href="x-msg://8/#branchesmule32xcoresrcmainjavaorgmuleutillockServerLockFactoryjava">branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java
  • <a href="x-msg://8/#branchesmule32xcoresrctestjavaorgmuletckjunit4ruleSystemPropertyjava">branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java
  • branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/
  • <a href="x-msg://8/#branchesmule32xcoresrctestjavaorgmuleutillockServerLockTestCasejava">branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java

Diff

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java (24785 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java	2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -148,6 +148,7 @@
     public static final String OBJECT_DEFAULT_RETRY_POLICY_TEMPLATE = "_defaultRetryPolicyTemplate";
     public static final String OBJECT_MULE_CONFIGURATION = "_muleConfiguration";
     public static final String OBJECT_MULE_NAMESPACE_MANAGER = "_muleNamespaceManager";
+    public static final String OBJECT_LOCK_FACTORY = "_muleLockFactory";
 
     // Not currently used as these need to be instance variables of the MuleContext.
     public static final String OBJECT_WORK_MANAGER = "_muleWorkManager";

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java (24785 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java	2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -49,7 +49,7 @@
     @Override
     public void initialise() throws InitialisationException
     {
-        if (maxRedeliveryCount < 1)
+        if (maxRedeliveryCount < 0)
         {
             throw new InitialisationException(
                 CoreMessages.initialisationFailure(

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java (24785 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java	2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -25,10 +25,13 @@
 import org.mule.transformer.simple.ObjectToByteArray;
 
 import java.io.InputStream;
+import java.io.Serializable;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.mule.util.lock.Lock;
+import org.mule.util.lock.LockFactory;
 import org.mule.util.store.ObjectStorePartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,6 +53,7 @@
     private String messageDigestAlgorithm;
     private String idExpression;
     private ObjectStore<AtomicInteger> store;
+    private Lock<Serializable> lock;
 
     @Override
     public void initialise() throws InitialisationException
@@ -95,6 +99,11 @@
             }
         }
 
+        String appName = muleContext.getConfiguration().getId();
+        String flowName = flowConstruct.getName();
+        String idrId = String.format("%s-%s-%s",appName,flowName,"idr");
+        lock = ((LockFactory<Serializable>)muleContext.getRegistry().get(MuleProperties.OBJECT_LOCK_FACTORY)).createLock(idrId);
+
         store = createStore();
     }
 
@@ -165,45 +174,55 @@
             exceptionSeen = true;
         }
 
-        if (!exceptionSeen)
+        lock.lock(messageId);
+        try
         {
-            counter = findCounter(messageId);
-            tooMany = counter != null && counter.get() > maxRedeliveryCount;
-        }
 
-        if (tooMany || exceptionSeen)
-        {
-            try
+            if (!exceptionSeen)
             {
-                return deadLetterQueue.process(event);
+                counter = findCounter(messageId);
+                tooMany = counter != null && counter.get() > maxRedeliveryCount;
             }
-            catch (Exception ex)
+
+            if (tooMany || exceptionSeen)
             {
-                logger.info("Exception thrown from failed message processing for message " + messageId, ex);
+                try
+                {
+                    return deadLetterQueue.process(event);
+                }
+                catch (Exception ex)
+                {
+                    logger.info("Exception thrown from failed message processing for message " + messageId, ex);
+                }
+                return null;
             }
-            return null;
-        }
 
-        try
-        {
-            MuleEvent returnEvent = processNext(event);
-            counter = findCounter(messageId);
-            if (counter != null)
+            try
             {
-                resetCounter(messageId);
+                MuleEvent returnEvent = processNext(event);
+                counter = findCounter(messageId);
+                if (counter != null)
+                {
+                    resetCounter(messageId);
+                }
+                return returnEvent;
             }
-            return returnEvent;
+            catch (MuleException ex)
+            {
+                incrementCounter(messageId);
+                throw ex;
+            }
+            catch (RuntimeException ex)
+            {
+                incrementCounter(messageId);
+                throw ex;
+            }
         }
-        catch (MuleException ex)
+        finally
         {
-            incrementCounter(messageId);
-            throw ex;
+            lock.unlock(messageId);
         }
-        catch (RuntimeException ex)
-        {
-            incrementCounter(messageId);
-            throw ex;
-        }
+
     }
 
     private void resetCounter(String messageId) throws ObjectStoreException

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java (0 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,30 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import org.mule.api.lifecycle.Disposable;
+
+/**
+ * Interface to provide a locking mechanism to use in mule components
+ */
+public interface Lock<T> extends Disposable
+{
+
+    /*
+     * Gets a lock over the resource identified with lockId
+     */
+    void lock(T lockId);
+
+    /*
+     * Releases lock over the resource identified with lockId
+     */
+    void unlock(T lockId);
+    
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java (0 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,25 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+/**
+ * Factory for creating Lock instances.
+ *
+ * Default LockFactory can be override by modules using registry-bootstrap.
+ */
+public interface LockFactory<T>
+{
+
+    /**
+     * Creates a Lock for a given resource using the resource unique identifier.
+     */
+    Lock<T> createLock(String lockResourceName);
+    
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java (0 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,104 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Default implementation of the Lock interface. Useful for doing locking in a single mule instance.
+ */
+public class ServerLock<T> implements Lock<T>
+{
+    private Map<T, LockEntry> locks;
+    private Object acquireLock = new Object();
+
+    public ServerLock()
+    {
+        this.locks = new HashMap<T,LockEntry>();
+    }
+
+    public void lock(T key)
+    {
+        LockEntry lock;
+        synchronized (acquireLock)
+        {
+            if (this.locks.containsKey(key))
+            {
+                lock = this.locks.get(key);
+            }
+            else
+            {
+                lock = new LockEntry();
+                this.locks.put(key,lock);
+            }
+            lock.incrementLockCount();
+            acquireLock.notifyAll();
+        }
+        lock.lock();
+    }
+
+    public void unlock(T key)
+    {
+        synchronized (acquireLock)
+        {
+            LockEntry lock = this.locks.get(key);
+            if (lock != null)
+            {
+                lock.decrementLockCount();
+                if (!lock.hasPendingLocks())
+                {
+                    this.locks.remove(key);
+                }
+                lock.unlock();
+            }
+            acquireLock.notifyAll();
+        }
+    }
+
+    public static class LockEntry
+    {
+        private AtomicInteger lockCount  = new AtomicInteger(0);
+        private ReentrantLock lock = new ReentrantLock(true);
+
+        public void lock()
+        {
+            lock.lock();
+        }
+
+        public void incrementLockCount()
+        {
+            lockCount.incrementAndGet();
+        }
+
+        public void decrementLockCount()
+        {
+            lockCount.decrementAndGet();
+        }
+
+        public void unlock()
+        {
+            lock.unlock();
+        }
+
+        public boolean hasPendingLocks()
+        {
+            return lockCount.get() > 0;
+        }
+    }
+
+    @Override
+    public void dispose()
+    {
+        locks.clear();
+    }
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java (0 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,19 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+public class ServerLockFactory<T> implements LockFactory<T>
+{
+    @Override
+    public Lock<T> createLock(String lockResourceName)
+    {
+        return new ServerLock<T>();
+    }
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Modified: branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java (24785 => 24786)


--- branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java	2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -10,71 +10,97 @@
 
 package org.mule.processor;
 
-import static org.mockito.Matchers.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-
+import junit.framework.Assert;
 import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
 import org.mockito.Answers;
-import org.mockito.Mockito;
 import org.mockito.internal.verification.VerificationModeFactory;
-import org.mule.api.*;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mule.api.MuleContext;
+import org.mule.api.MuleEvent;
+import org.mule.api.MuleException;
+import org.mule.api.MuleMessage;
 import org.mule.api.config.MuleProperties;
 import org.mule.api.construct.FlowConstruct;
 import org.mule.api.processor.MessageProcessor;
 import org.mule.api.store.ObjectStore;
 import org.mule.api.store.ObjectStoreException;
 import org.mule.api.store.ObjectStoreManager;
-import org.mule.routing.MessageProcessorFilterPair;
 import org.mule.tck.junit4.AbstractMuleTestCase;
-
-import org.junit.Test;
-
-import junit.framework.Assert;
+import org.mule.tck.junit4.rule.SystemProperty;
 import org.mule.util.SerializationUtils;
+import org.mule.util.concurrent.Latch;
+import org.mule.util.lock.ServerLock;
+import org.mule.util.lock.ServerLockFactory;
 
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class IdempotentRedeliveryPolicyTestCase extends AbstractMuleTestCase
 {
 
     public static final String STRING_MESSAGE = "message";
-    public static final int MAX_REDELIVERY_COUNT = 1;
+    public static final int MAX_REDELIVERY_COUNT = 0;
     private MuleContext mockMuleContext = mock(MuleContext.class, Answers.RETURNS_DEEP_STUBS.get());
     private ObjectStoreManager mockObjectStoreManager = mock(ObjectStoreManager.class, Answers.RETURNS_DEEP_STUBS.get());
     private MessageProcessor mockFailingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
-    private MessageProcessorFilterPair mockDlqMessageProcessor = mock(MessageProcessorFilterPair.class, Answers.RETURNS_DEEP_STUBS.get());
+    private MessageProcessor mockWaitingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
+    private MessageProcessor mockDlqMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
     private MuleMessage message = mock(MuleMessage.class, Answers.RETURNS_DEEP_STUBS.get());
     private MuleEvent event = mock(MuleEvent.class, Answers.RETURNS_DEEP_STUBS.get());
+    private Latch waitLatch = new Latch();
+    private CountDownLatch waitingMessageProcessorExecutionLatch = new CountDownLatch(2);
+    private final IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
 
+    @Rule
+    public SystemProperty systemProperty = new SystemProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+
     @Before
     public void setUpTest() throws MuleException
     {
         when(mockFailingMessageProcessor.process(any(MuleEvent.class))).thenThrow(new RuntimeException("failing"));
-        System.setProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+        when(mockWaitingMessageProcessor.process(event)).thenAnswer(new Answer<MuleEvent>()
+        {
+            @Override
+            public MuleEvent answer(InvocationOnMock invocationOnMock) throws Throwable
+            {
+                waitingMessageProcessorExecutionLatch.countDown();
+                waitLatch.await(2000, TimeUnit.MILLISECONDS);
+                return mockFailingMessageProcessor.process((MuleEvent) invocationOnMock.getArguments()[0]);
+            }
+        });
+        when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_LOCK_FACTORY)).thenReturn(new ServerLockFactory());
+        when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
+        InMemoryObjectStore inMemoryObjectStore = new InMemoryObjectStore();
+        when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(inMemoryObjectStore);
+        when(event.getMessage()).thenReturn(message);
+        irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
+        irp.setUseSecureHash(true);
+        irp.setFlowConstruct(mock(FlowConstruct.class));
+        irp.setMuleContext(mockMuleContext);
+        irp.setListener(mockFailingMessageProcessor);
+        irp.setMessageProcessor(mockDlqMessageProcessor);
+
     }
 
     @Test
     public void messageDigestFailure() throws Exception
     {
-        Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
-        Mockito.when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(new InMemoryObjectStore());
-
-        IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
-        irp.setUseSecureHash(true);
-        irp.setMaxRedeliveryCount(1);
-        irp.setFlowConstruct(mock(FlowConstruct.class));
-        irp.setMuleContext(mockMuleContext);
+        when(message.getPayload()).thenReturn(new Object());
         irp.initialise();
-
-
-        when(message.getPayload()).thenReturn(new Object());
-
-        when(event.getMessage()).thenReturn(message);
         MuleEvent process = irp.process(event);
         Assert.assertNull(process);
     }
@@ -82,22 +108,42 @@
     @Test
     public void testMessageRedeliveryUsingMemory() throws Exception
     {
-        Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
-        Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new InMemoryObjectStore());
+        when(message.getPayload()).thenReturn(STRING_MESSAGE);
+        irp.initialise();
+        processUntilFailure();
+        verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+    }
 
-        IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
-        irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
-        irp.setUseSecureHash(true);
-        irp.setFlowConstruct(mock(FlowConstruct.class));
-        irp.setMuleContext(mockMuleContext);
-        irp.setListener(mockFailingMessageProcessor);
-        irp.setDeadLetterQueue(mockDlqMessageProcessor);
+    @Test
+    public void testMessageRedeliveryUsingSerializationStore() throws Exception
+    {
+        when(message.getPayload()).thenReturn(STRING_MESSAGE);
         irp.initialise();
+        processUntilFailure();
+        verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+    }
 
+    @Test
+    public void testThreadSafeObjectStoreUsage() throws Exception
+    {
         when(message.getPayload()).thenReturn(STRING_MESSAGE);
-        when(event.getMessage()).thenReturn(message);
+        irp.setListener(mockWaitingMessageProcessor);
+        irp.initialise();
 
-        for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)
+        ExecuteIrpThread firstIrpExecutionThread = new ExecuteIrpThread();
+        firstIrpExecutionThread.start();
+        ExecuteIrpThread threadCausingRedeliveryException = new ExecuteIrpThread();
+        threadCausingRedeliveryException.start();
+        waitingMessageProcessorExecutionLatch.await(5000, TimeUnit.MILLISECONDS);
+        waitLatch.release();
+        firstIrpExecutionThread.join();
+        threadCausingRedeliveryException.join();
+        verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+    }
+
+    private void processUntilFailure()
+    {
+        for (int i = 0; i < MAX_REDELIVERY_COUNT + 2; i++)
         {
             try
             {
@@ -107,28 +153,14 @@
             {
             }
         }
-        verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1));
     }
 
-    @Test
-    public void testMessageRedeliveryUsingSerializationStore() throws Exception
+    public class ExecuteIrpThread extends Thread
     {
-        Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
-        Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new SerializationObjectStore());
-
-        IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
-        irp.setUseSecureHash(true);
-        irp.setMaxRedeliveryCount(1);
-        irp.setFlowConstruct(mock(FlowConstruct.class));
-        irp.setMuleContext(mockMuleContext);
-        irp.setListener(mockFailingMessageProcessor);
-        irp.setDeadLetterQueue(mockDlqMessageProcessor);
-        irp.initialise();
-
-        when(message.getPayload()).thenReturn(STRING_MESSAGE);
-        when(event.getMessage()).thenReturn(message);
-
-        for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)
+        public Exception exception;
+        
+        @Override
+        public void run()
         {
             try
             {
@@ -136,15 +168,18 @@
             }
             catch (Exception e)
             {
+                exception = e;
             }
         }
-        verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1));
     }
+   
+    
 
     public static class SerializationObjectStore implements ObjectStore<AtomicInteger>
     {
 
         private Map<Serializable,Serializable> store = new HashMap<Serializable,Serializable>();
+        private ServerLock lockableObjectStore = new ServerLock();
 
         @Override
         public boolean contains(Serializable key) throws ObjectStoreException
@@ -177,11 +212,13 @@
         {
             return false;
         }
+
     }
 
     public static class InMemoryObjectStore implements ObjectStore<AtomicInteger>
     {
         private Map<Serializable,AtomicInteger> store = new HashMap<Serializable,AtomicInteger>();
+        private ServerLock lockableObjectStore = new ServerLock();
 
         @Override
         public boolean contains(Serializable key) throws ObjectStoreException
@@ -212,6 +249,7 @@
         {
             return false;
         }
+
     }
 
 }

Added: branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java (0 => 24786)


--- branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,90 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.tck.junit4.rule;
+
+import org.junit.rules.ExternalResource;
+
+/**
+ * Sets up a system property before a test and guaranties to tear it down
+ * afterward.
+ */
+public class SystemProperty extends ExternalResource
+{
+
+    private final String name;
+    private String value;
+    private boolean initialized;
+    private String oldValue;
+
+    public SystemProperty(String name)
+    {
+        this(name, null);
+    }
+
+    public SystemProperty(String name, String value)
+    {
+        this.name = name;
+        this.value = value;
+    }
+
+    @Override
+    protected void before() throws Throwable
+    {
+        if (initialized)
+        {
+            throw new IllegalArgumentException("System property was already initialized");
+        }
+
+        oldValue = System.setProperty(name, getValue());
+        initialized = true;
+    }
+
+    @Override
+    protected void after()
+    {
+        if (!initialized)
+        {
+            throw new IllegalArgumentException("System property was not initialized");
+        }
+
+        doCleanUp();
+        restoreOldValue();
+
+        initialized = false;
+    }
+
+    protected void restoreOldValue()
+    {
+        if (oldValue == null)
+        {
+            System.clearProperty(name);
+        }
+        else
+        {
+            System.setProperty(name, oldValue);
+        }
+    }
+
+    public String getName()
+    {
+        return name;
+    }
+
+    protected void doCleanUp()
+    {
+        // Nothing to do
+    };
+
+    public String getValue()
+    {
+        return value;
+    };
+}
Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java (0 => 24786)


--- branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,151 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import org.junit.Test;
+import org.mule.api.store.ObjectAlreadyExistsException;
+import org.mule.api.store.ObjectStore;
+import org.mule.api.store.ObjectStoreException;
+import org.mule.config.i18n.CoreMessages;
+import org.mule.tck.junit4.AbstractMuleTestCase;
+import org.mule.util.concurrent.Latch;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class ServerLockTestCase extends AbstractMuleTestCase
+{
+    public static final int THREAD_COUNT = 100;
+    public static final int ITERATIONS_PER_THREAD = 100;
+    private Latch threadStartLatch = new Latch();
+    private String sharedKeyA = "A";
+    private String sharedKeyB = "B";
+    private ServerLock<String> serverLock = new ServerLock<String>();
+    private InMemoryObjectStore objectStore  = new InMemoryObjectStore();
+
+    @Test
+    public void testHighConcurrency() throws Exception
+    {
+        List<Thread> threads = new ArrayList<Thread>(THREAD_COUNT);
+        for (int i = 0; i < THREAD_COUNT; i++)
+        {
+            IncrementKeyValueThread incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyA);
+            threads.add(incrementKeyValueThread);
+            incrementKeyValueThread.start();
+            incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyB);
+            threads.add(incrementKeyValueThread);
+            incrementKeyValueThread.start();
+        }
+        threadStartLatch.release();
+        for (Thread thread : threads)
+        {
+            thread.join();
+        }
+        assertThat(objectStore.retrieve(sharedKeyA), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+        assertThat(objectStore.retrieve(sharedKeyB), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+    }
+    
+    public class IncrementKeyValueThread extends Thread
+    {
+        private String key;
+
+        public IncrementKeyValueThread(String key)
+        {
+            super("Thread-" + key);
+            this.key = key;
+        }
+
+        @Override
+        public void run()
+        {
+            try
+            {
+                threadStartLatch.await(5000, TimeUnit.MILLISECONDS);
+                for (int i = 0; i < ITERATIONS_PER_THREAD; i ++)
+                {
+                    if (Thread.interrupted())
+                    {
+                        break;
+                    }
+                    serverLock.lock(key);
+                    try
+                    {
+                        Integer value;
+                        if (objectStore.contains(key))
+                        {
+                            value = objectStore.retrieve(key);
+                            objectStore.remove(key);
+                        }
+                        else
+                        {
+                            value = 0;
+                        }
+                        objectStore.store(key,value + 1);
+                    }
+                    finally
+                    {
+                        serverLock.unlock(key);
+                    }
+                }
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public static class InMemoryObjectStore implements ObjectStore<Integer>
+    {
+        private Map<Serializable,Integer> store = new HashMap<Serializable,Integer>();
+
+        @Override
+        public boolean contains(Serializable key) throws ObjectStoreException
+        {
+            return store.containsKey(key);
+        }
+
+        @Override
+        public void store(Serializable key, Integer value) throws ObjectStoreException
+        {
+            if (store.containsKey(key))
+            {
+                throw new ObjectAlreadyExistsException(CoreMessages.createStaticMessage(""));
+            }
+            store.put(key,value);
+        }
+
+        @Override
+        public Integer retrieve(Serializable key) throws ObjectStoreException
+        {
+            return store.get(key);
+        }
+
+        @Override
+        public Integer remove(Serializable key) throws ObjectStoreException
+        {
+            return store.remove(key);
+        }
+
+        @Override
+        public boolean isPersistent()
+        {
+            return false;
+        }
+    }
+
+}
Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Modified: branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml (24785 => 24786)


--- branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml	2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml	2012-08-21 05:18:14 UTC (rev 24786)
@@ -86,6 +86,8 @@
 
     <bean name="_defaultRetryPolicyTemplate" class="org.mule.retry.policies.NoRetryPolicyTemplate"/>
 
+    <bean name="_muleLockFactory" class="org.mule.util.lock.ServerLockFactory"/>
+
     <!-- Default Transformers are now loaded from META-INF/services/org/mule/config/registry-bootstrap.properties so that
     the transformers will be available even when using the TransientRegistry only -->
 

To unsubscribe from this list please visit:

http://xircles.codehaus.org/manage_email


Reply | Threaded
Open this post in threaded view
|

Re: [mule-scm] [mule][24786] branches/mule-3.2.x: MULE-6403 - adding basic locking mechanism and applying it to IdempotentRedeliveryPolicy

Pablo La Greca
LockFactory was added to org.mule.config.builders.DefaultsConfigurationBuilder  as suggested.

IdempotentMessageFilter relies on the fact that storing twice the same key in an object store will fail. So it's not the same case.

On Tue, Aug 21, 2012 at 9:22 AM, Daniel Feist <[hidden email]> wrote:
Does IdempotentMessageFilter need the same?

Dan

On Aug 21, 2012, at 2:18 AM, [hidden email] wrote:

Revision
24786
Author
pablo.lagreca
Date
2012-08-21 00:18:14 -0500 (Tue, 21 Aug 2012)

Log Message

MULE-6403 - adding basic locking mechanism and applying it to IdempotentRedeliveryPolicy

Modified Paths

Added Paths

Diff

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java (24785 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java	2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -148,6 +148,7 @@
     public static final String OBJECT_DEFAULT_RETRY_POLICY_TEMPLATE = "_defaultRetryPolicyTemplate";
     public static final String OBJECT_MULE_CONFIGURATION = "_muleConfiguration";
     public static final String OBJECT_MULE_NAMESPACE_MANAGER = "_muleNamespaceManager";
+    public static final String OBJECT_LOCK_FACTORY = "_muleLockFactory";
 
     // Not currently used as these need to be instance variables of the MuleContext.
     public static final String OBJECT_WORK_MANAGER = "_muleWorkManager";

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java (24785 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java	2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -49,7 +49,7 @@
     @Override
     public void initialise() throws InitialisationException
     {
-        if (maxRedeliveryCount < 1)
+        if (maxRedeliveryCount < 0)
         {
             throw new InitialisationException(
                 CoreMessages.initialisationFailure(

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java (24785 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java	2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -25,10 +25,13 @@
 import org.mule.transformer.simple.ObjectToByteArray;
 
 import java.io.InputStream;
+import java.io.Serializable;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.mule.util.lock.Lock;
+import org.mule.util.lock.LockFactory;
 import org.mule.util.store.ObjectStorePartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,6 +53,7 @@
     private String messageDigestAlgorithm;
     private String idExpression;
     private ObjectStore<AtomicInteger> store;
+    private Lock<Serializable> lock;
 
     @Override
     public void initialise() throws InitialisationException
@@ -95,6 +99,11 @@
             }
         }
 
+        String appName = muleContext.getConfiguration().getId();
+        String flowName = flowConstruct.getName();
+        String idrId = String.format("%s-%s-%s",appName,flowName,"idr");
+        lock = ((LockFactory<Serializable>)muleContext.getRegistry().get(MuleProperties.OBJECT_LOCK_FACTORY)).createLock(idrId);
+
         store = createStore();
     }
 
@@ -165,45 +174,55 @@
             exceptionSeen = true;
         }
 
-        if (!exceptionSeen)
+        lock.lock(messageId);
+        try
         {
-            counter = findCounter(messageId);
-            tooMany = counter != null && counter.get() > maxRedeliveryCount;
-        }
 
-        if (tooMany || exceptionSeen)
-        {
-            try
+            if (!exceptionSeen)
             {
-                return deadLetterQueue.process(event);
+                counter = findCounter(messageId);
+                tooMany = counter != null && counter.get() > maxRedeliveryCount;
             }
-            catch (Exception ex)
+
+            if (tooMany || exceptionSeen)
             {
-                logger.info("Exception thrown from failed message processing for message " + messageId, ex);
+                try
+                {
+                    return deadLetterQueue.process(event);
+                }
+                catch (Exception ex)
+                {
+                    logger.info("Exception thrown from failed message processing for message " + messageId, ex);
+                }
+                return null;
             }
-            return null;
-        }
 
-        try
-        {
-            MuleEvent returnEvent = processNext(event);
-            counter = findCounter(messageId);
-            if (counter != null)
+            try
             {
-                resetCounter(messageId);
+                MuleEvent returnEvent = processNext(event);
+                counter = findCounter(messageId);
+                if (counter != null)
+                {
+                    resetCounter(messageId);
+                }
+                return returnEvent;
             }
-            return returnEvent;
+            catch (MuleException ex)
+            {
+                incrementCounter(messageId);
+                throw ex;
+            }
+            catch (RuntimeException ex)
+            {
+                incrementCounter(messageId);
+                throw ex;
+            }
         }
-        catch (MuleException ex)
+        finally
         {
-            incrementCounter(messageId);
-            throw ex;
+            lock.unlock(messageId);
         }
-        catch (RuntimeException ex)
-        {
-            incrementCounter(messageId);
-            throw ex;
-        }
+
     }
 
     private void resetCounter(String messageId) throws ObjectStoreException

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java (0 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,30 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import org.mule.api.lifecycle.Disposable;
+
+/**
+ * Interface to provide a locking mechanism to use in mule components
+ */
+public interface Lock<T> extends Disposable
+{
+
+    /*
+     * Gets a lock over the resource identified with lockId
+     */
+    void lock(T lockId);
+
+    /*
+     * Releases lock over the resource identified with lockId
+     */
+    void unlock(T lockId);
+    
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java (0 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,25 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+/**
+ * Factory for creating Lock instances.
+ *
+ * Default LockFactory can be override by modules using registry-bootstrap.
+ */
+public interface LockFactory<T>
+{
+
+    /**
+     * Creates a Lock for a given resource using the resource unique identifier.
+     */
+    Lock<T> createLock(String lockResourceName);
+    
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java (0 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,104 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Default implementation of the Lock interface. Useful for doing locking in a single mule instance.
+ */
+public class ServerLock<T> implements Lock<T>
+{
+    private Map<T, LockEntry> locks;
+    private Object acquireLock = new Object();
+
+    public ServerLock()
+    {
+        this.locks = new HashMap<T,LockEntry>();
+    }
+
+    public void lock(T key)
+    {
+        LockEntry lock;
+        synchronized (acquireLock)
+        {
+            if (this.locks.containsKey(key))
+            {
+                lock = this.locks.get(key);
+            }
+            else
+            {
+                lock = new LockEntry();
+                this.locks.put(key,lock);
+            }
+            lock.incrementLockCount();
+            acquireLock.notifyAll();
+        }
+        lock.lock();
+    }
+
+    public void unlock(T key)
+    {
+        synchronized (acquireLock)
+        {
+            LockEntry lock = this.locks.get(key);
+            if (lock != null)
+            {
+                lock.decrementLockCount();
+                if (!lock.hasPendingLocks())
+                {
+                    this.locks.remove(key);
+                }
+                lock.unlock();
+            }
+            acquireLock.notifyAll();
+        }
+    }
+
+    public static class LockEntry
+    {
+        private AtomicInteger lockCount  = new AtomicInteger(0);
+        private ReentrantLock lock = new ReentrantLock(true);
+
+        public void lock()
+        {
+            lock.lock();
+        }
+
+        public void incrementLockCount()
+        {
+            lockCount.incrementAndGet();
+        }
+
+        public void decrementLockCount()
+        {
+            lockCount.decrementAndGet();
+        }
+
+        public void unlock()
+        {
+            lock.unlock();
+        }
+
+        public boolean hasPendingLocks()
+        {
+            return lockCount.get() > 0;
+        }
+    }
+
+    @Override
+    public void dispose()
+    {
+        locks.clear();
+    }
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java (0 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,19 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+public class ServerLockFactory<T> implements LockFactory<T>
+{
+    @Override
+    public Lock<T> createLock(String lockResourceName)
+    {
+        return new ServerLock<T>();
+    }
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Modified: branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java (24785 => 24786)


--- branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java	2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -10,71 +10,97 @@
 
 package org.mule.processor;
 
-import static org.mockito.Matchers.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-
+import junit.framework.Assert;
 import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
 import org.mockito.Answers;
-import org.mockito.Mockito;
 import org.mockito.internal.verification.VerificationModeFactory;
-import org.mule.api.*;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mule.api.MuleContext;
+import org.mule.api.MuleEvent;
+import org.mule.api.MuleException;
+import org.mule.api.MuleMessage;
 import org.mule.api.config.MuleProperties;
 import org.mule.api.construct.FlowConstruct;
 import org.mule.api.processor.MessageProcessor;
 import org.mule.api.store.ObjectStore;
 import org.mule.api.store.ObjectStoreException;
 import org.mule.api.store.ObjectStoreManager;
-import org.mule.routing.MessageProcessorFilterPair;
 import org.mule.tck.junit4.AbstractMuleTestCase;
-
-import org.junit.Test;
-
-import junit.framework.Assert;
+import org.mule.tck.junit4.rule.SystemProperty;
 import org.mule.util.SerializationUtils;
+import org.mule.util.concurrent.Latch;
+import org.mule.util.lock.ServerLock;
+import org.mule.util.lock.ServerLockFactory;
 
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class IdempotentRedeliveryPolicyTestCase extends AbstractMuleTestCase
 {
 
     public static final String STRING_MESSAGE = "message";
-    public static final int MAX_REDELIVERY_COUNT = 1;
+    public static final int MAX_REDELIVERY_COUNT = 0;
     private MuleContext mockMuleContext = mock(MuleContext.class, Answers.RETURNS_DEEP_STUBS.get());
     private ObjectStoreManager mockObjectStoreManager = mock(ObjectStoreManager.class, Answers.RETURNS_DEEP_STUBS.get());
     private MessageProcessor mockFailingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
-    private MessageProcessorFilterPair mockDlqMessageProcessor = mock(MessageProcessorFilterPair.class, Answers.RETURNS_DEEP_STUBS.get());
+    private MessageProcessor mockWaitingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
+    private MessageProcessor mockDlqMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
     private MuleMessage message = mock(MuleMessage.class, Answers.RETURNS_DEEP_STUBS.get());
     private MuleEvent event = mock(MuleEvent.class, Answers.RETURNS_DEEP_STUBS.get());
+    private Latch waitLatch = new Latch();
+    private CountDownLatch waitingMessageProcessorExecutionLatch = new CountDownLatch(2);
+    private final IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
 
+    @Rule
+    public SystemProperty systemProperty = new SystemProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+
     @Before
     public void setUpTest() throws MuleException
     {
         when(mockFailingMessageProcessor.process(any(MuleEvent.class))).thenThrow(new RuntimeException("failing"));
-        System.setProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+        when(mockWaitingMessageProcessor.process(event)).thenAnswer(new Answer<MuleEvent>()
+        {
+            @Override
+            public MuleEvent answer(InvocationOnMock invocationOnMock) throws Throwable
+            {
+                waitingMessageProcessorExecutionLatch.countDown();
+                waitLatch.await(2000, TimeUnit.MILLISECONDS);
+                return mockFailingMessageProcessor.process((MuleEvent) invocationOnMock.getArguments()[0]);
+            }
+        });
+        when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_LOCK_FACTORY)).thenReturn(new ServerLockFactory());
+        when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
+        InMemoryObjectStore inMemoryObjectStore = new InMemoryObjectStore();
+        when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(inMemoryObjectStore);
+        when(event.getMessage()).thenReturn(message);
+        irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
+        irp.setUseSecureHash(true);
+        irp.setFlowConstruct(mock(FlowConstruct.class));
+        irp.setMuleContext(mockMuleContext);
+        irp.setListener(mockFailingMessageProcessor);
+        irp.setMessageProcessor(mockDlqMessageProcessor);
+
     }
 
     @Test
     public void messageDigestFailure() throws Exception
     {
-        Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
-        Mockito.when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(new InMemoryObjectStore());
-
-        IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
-        irp.setUseSecureHash(true);
-        irp.setMaxRedeliveryCount(1);
-        irp.setFlowConstruct(mock(FlowConstruct.class));
-        irp.setMuleContext(mockMuleContext);
+        when(message.getPayload()).thenReturn(new Object());
         irp.initialise();
-
-
-        when(message.getPayload()).thenReturn(new Object());
-
-        when(event.getMessage()).thenReturn(message);
         MuleEvent process = irp.process(event);
         Assert.assertNull(process);
     }
@@ -82,22 +108,42 @@
     @Test
     public void testMessageRedeliveryUsingMemory() throws Exception
     {
-        Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
-        Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new InMemoryObjectStore());
+        when(message.getPayload()).thenReturn(STRING_MESSAGE);
+        irp.initialise();
+        processUntilFailure();
+        verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+    }
 
-        IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
-        irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
-        irp.setUseSecureHash(true);
-        irp.setFlowConstruct(mock(FlowConstruct.class));
-        irp.setMuleContext(mockMuleContext);
-        irp.setListener(mockFailingMessageProcessor);
-        irp.setDeadLetterQueue(mockDlqMessageProcessor);
+    @Test
+    public void testMessageRedeliveryUsingSerializationStore() throws Exception
+    {
+        when(message.getPayload()).thenReturn(STRING_MESSAGE);
         irp.initialise();
+        processUntilFailure();
+        verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+    }
 
+    @Test
+    public void testThreadSafeObjectStoreUsage() throws Exception
+    {
         when(message.getPayload()).thenReturn(STRING_MESSAGE);
-        when(event.getMessage()).thenReturn(message);
+        irp.setListener(mockWaitingMessageProcessor);
+        irp.initialise();
 
-        for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)
+        ExecuteIrpThread firstIrpExecutionThread = new ExecuteIrpThread();
+        firstIrpExecutionThread.start();
+        ExecuteIrpThread threadCausingRedeliveryException = new ExecuteIrpThread();
+        threadCausingRedeliveryException.start();
+        waitingMessageProcessorExecutionLatch.await(5000, TimeUnit.MILLISECONDS);
+        waitLatch.release();
+        firstIrpExecutionThread.join();
+        threadCausingRedeliveryException.join();
+        verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+    }
+
+    private void processUntilFailure()
+    {
+        for (int i = 0; i < MAX_REDELIVERY_COUNT + 2; i++)
         {
             try
             {
@@ -107,28 +153,14 @@
             {
             }
         }
-        verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1));
     }
 
-    @Test
-    public void testMessageRedeliveryUsingSerializationStore() throws Exception
+    public class ExecuteIrpThread extends Thread
     {
-        Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
-        Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new SerializationObjectStore());
-
-        IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
-        irp.setUseSecureHash(true);
-        irp.setMaxRedeliveryCount(1);
-        irp.setFlowConstruct(mock(FlowConstruct.class));
-        irp.setMuleContext(mockMuleContext);
-        irp.setListener(mockFailingMessageProcessor);
-        irp.setDeadLetterQueue(mockDlqMessageProcessor);
-        irp.initialise();
-
-        when(message.getPayload()).thenReturn(STRING_MESSAGE);
-        when(event.getMessage()).thenReturn(message);
-
-        for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)
+        public Exception exception;
+        
+        @Override
+        public void run()
         {
             try
             {
@@ -136,15 +168,18 @@
             }
             catch (Exception e)
             {
+                exception = e;
             }
         }
-        verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1));
     }
+   
+    
 
     public static class SerializationObjectStore implements ObjectStore<AtomicInteger>
     {
 
         private Map<Serializable,Serializable> store = new HashMap<Serializable,Serializable>();
+        private ServerLock lockableObjectStore = new ServerLock();
 
         @Override
         public boolean contains(Serializable key) throws ObjectStoreException
@@ -177,11 +212,13 @@
         {
             return false;
         }
+
     }
 
     public static class InMemoryObjectStore implements ObjectStore<AtomicInteger>
     {
         private Map<Serializable,AtomicInteger> store = new HashMap<Serializable,AtomicInteger>();
+        private ServerLock lockableObjectStore = new ServerLock();
 
         @Override
         public boolean contains(Serializable key) throws ObjectStoreException
@@ -212,6 +249,7 @@
         {
             return false;
         }
+
     }
 
 }

Added: branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java (0 => 24786)


--- branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,90 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.tck.junit4.rule;
+
+import org.junit.rules.ExternalResource;
+
+/**
+ * Sets up a system property before a test and guaranties to tear it down
+ * afterward.
+ */
+public class SystemProperty extends ExternalResource
+{
+
+    private final String name;
+    private String value;
+    private boolean initialized;
+    private String oldValue;
+
+    public SystemProperty(String name)
+    {
+        this(name, null);
+    }
+
+    public SystemProperty(String name, String value)
+    {
+        this.name = name;
+        this.value = value;
+    }
+
+    @Override
+    protected void before() throws Throwable
+    {
+        if (initialized)
+        {
+            throw new IllegalArgumentException("System property was already initialized");
+        }
+
+        oldValue = System.setProperty(name, getValue());
+        initialized = true;
+    }
+
+    @Override
+    protected void after()
+    {
+        if (!initialized)
+        {
+            throw new IllegalArgumentException("System property was not initialized");
+        }
+
+        doCleanUp();
+        restoreOldValue();
+
+        initialized = false;
+    }
+
+    protected void restoreOldValue()
+    {
+        if (oldValue == null)
+        {
+            System.clearProperty(name);
+        }
+        else
+        {
+            System.setProperty(name, oldValue);
+        }
+    }
+
+    public String getName()
+    {
+        return name;
+    }
+
+    protected void doCleanUp()
+    {
+        // Nothing to do
+    };
+
+    public String getValue()
+    {
+        return value;
+    };
+}
Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java (0 => 24786)


--- branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,151 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import org.junit.Test;
+import org.mule.api.store.ObjectAlreadyExistsException;
+import org.mule.api.store.ObjectStore;
+import org.mule.api.store.ObjectStoreException;
+import org.mule.config.i18n.CoreMessages;
+import org.mule.tck.junit4.AbstractMuleTestCase;
+import org.mule.util.concurrent.Latch;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class ServerLockTestCase extends AbstractMuleTestCase
+{
+    public static final int THREAD_COUNT = 100;
+    public static final int ITERATIONS_PER_THREAD = 100;
+    private Latch threadStartLatch = new Latch();
+    private String sharedKeyA = "A";
+    private String sharedKeyB = "B";
+    private ServerLock<String> serverLock = new ServerLock<String>();
+    private InMemoryObjectStore objectStore  = new InMemoryObjectStore();
+
+    @Test
+    public void testHighConcurrency() throws Exception
+    {
+        List<Thread> threads = new ArrayList<Thread>(THREAD_COUNT);
+        for (int i = 0; i < THREAD_COUNT; i++)
+        {
+            IncrementKeyValueThread incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyA);
+            threads.add(incrementKeyValueThread);
+            incrementKeyValueThread.start();
+            incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyB);
+            threads.add(incrementKeyValueThread);
+            incrementKeyValueThread.start();
+        }
+        threadStartLatch.release();
+        for (Thread thread : threads)
+        {
+            thread.join();
+        }
+        assertThat(objectStore.retrieve(sharedKeyA), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+        assertThat(objectStore.retrieve(sharedKeyB), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+    }
+    
+    public class IncrementKeyValueThread extends Thread
+    {
+        private String key;
+
+        public IncrementKeyValueThread(String key)
+        {
+            super("Thread-" + key);
+            this.key = key;
+        }
+
+        @Override
+        public void run()
+        {
+            try
+            {
+                threadStartLatch.await(5000, TimeUnit.MILLISECONDS);
+                for (int i = 0; i < ITERATIONS_PER_THREAD; i ++)
+                {
+                    if (Thread.interrupted())
+                    {
+                        break;
+                    }
+                    serverLock.lock(key);
+                    try
+                    {
+                        Integer value;
+                        if (objectStore.contains(key))
+                        {
+                            value = objectStore.retrieve(key);
+                            objectStore.remove(key);
+                        }
+                        else
+                        {
+                            value = 0;
+                        }
+                        objectStore.store(key,value + 1);
+                    }
+                    finally
+                    {
+                        serverLock.unlock(key);
+                    }
+                }
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public static class InMemoryObjectStore implements ObjectStore<Integer>
+    {
+        private Map<Serializable,Integer> store = new HashMap<Serializable,Integer>();
+
+        @Override
+        public boolean contains(Serializable key) throws ObjectStoreException
+        {
+            return store.containsKey(key);
+        }
+
+        @Override
+        public void store(Serializable key, Integer value) throws ObjectStoreException
+        {
+            if (store.containsKey(key))
+            {
+                throw new ObjectAlreadyExistsException(CoreMessages.createStaticMessage(""));
+            }
+            store.put(key,value);
+        }
+
+        @Override
+        public Integer retrieve(Serializable key) throws ObjectStoreException
+        {
+            return store.get(key);
+        }
+
+        @Override
+        public Integer remove(Serializable key) throws ObjectStoreException
+        {
+            return store.remove(key);
+        }
+
+        @Override
+        public boolean isPersistent()
+        {
+            return false;
+        }
+    }
+
+}
Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Modified: branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml (24785 => 24786)


--- branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml	2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml	2012-08-21 05:18:14 UTC (rev 24786)
@@ -86,6 +86,8 @@
 
     <bean name="_defaultRetryPolicyTemplate" class="org.mule.retry.policies.NoRetryPolicyTemplate"/>
 
+    <bean name="_muleLockFactory" class="org.mule.util.lock.ServerLockFactory"/>
+
     <!-- Default Transformers are now loaded from META-INF/services/org/mule/config/registry-bootstrap.properties so that
     the transformers will be available even when using the TransientRegistry only -->
 

To unsubscribe from this list please visit:

http://xircles.codehaus.org/manage_email



Reply | Threaded
Open this post in threaded view
|

Re: [mule-scm] [mule][24786] branches/mule-3.2.x: MULE-6403 - adding basic locking mechanism and applying it to IdempotentRedeliveryPolicy

Pablo Kraan

Some comments:


Lock: I think the javadoc needs more information. What happens when there is no object with the given key? what happens if the caller to the release method does not have the lock on passed key?

LockFactory: is not clear to me what is the purpose of the lockResourceName

ServerLock: acquireLock field must be final

Remove redundant usages of "this" keyword

Fix imports order


Pablo K


On Tue, Aug 21, 2012 at 2:25 PM, Pablo La Greca <[hidden email]> wrote:
LockFactory was added to org.mule.config.builders.DefaultsConfigurationBuilder  as suggested.

IdempotentMessageFilter relies on the fact that storing twice the same key in an object store will fail. So it's not the same case.

On Tue, Aug 21, 2012 at 9:22 AM, Daniel Feist <[hidden email]> wrote:
Does IdempotentMessageFilter need the same?

Dan

On Aug 21, 2012, at 2:18 AM, [hidden email] wrote:

Revision
24786
Author
pablo.lagreca
Date
2012-08-21 00:18:14 -0500 (Tue, 21 Aug 2012)

Log Message

MULE-6403 - adding basic locking mechanism and applying it to IdempotentRedeliveryPolicy

Modified Paths

Added Paths

Diff

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java (24785 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java	2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -148,6 +148,7 @@
     public static final String OBJECT_DEFAULT_RETRY_POLICY_TEMPLATE = "_defaultRetryPolicyTemplate";
     public static final String OBJECT_MULE_CONFIGURATION = "_muleConfiguration";
     public static final String OBJECT_MULE_NAMESPACE_MANAGER = "_muleNamespaceManager";
+    public static final String OBJECT_LOCK_FACTORY = "_muleLockFactory";
 
     // Not currently used as these need to be instance variables of the MuleContext.
     public static final String OBJECT_WORK_MANAGER = "_muleWorkManager";

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java (24785 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java	2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -49,7 +49,7 @@
     @Override
     public void initialise() throws InitialisationException
     {
-        if (maxRedeliveryCount < 1)
+        if (maxRedeliveryCount < 0)
         {
             throw new InitialisationException(
                 CoreMessages.initialisationFailure(

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java (24785 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java	2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -25,10 +25,13 @@
 import org.mule.transformer.simple.ObjectToByteArray;
 
 import java.io.InputStream;
+import java.io.Serializable;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.mule.util.lock.Lock;
+import org.mule.util.lock.LockFactory;
 import org.mule.util.store.ObjectStorePartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,6 +53,7 @@
     private String messageDigestAlgorithm;
     private String idExpression;
     private ObjectStore<AtomicInteger> store;
+    private Lock<Serializable> lock;
 
     @Override
     public void initialise() throws InitialisationException
@@ -95,6 +99,11 @@
             }
         }
 
+        String appName = muleContext.getConfiguration().getId();
+        String flowName = flowConstruct.getName();
+        String idrId = String.format("%s-%s-%s",appName,flowName,"idr");
+        lock = ((LockFactory<Serializable>)muleContext.getRegistry().get(MuleProperties.OBJECT_LOCK_FACTORY)).createLock(idrId);
+
         store = createStore();
     }
 
@@ -165,45 +174,55 @@
             exceptionSeen = true;
         }
 
-        if (!exceptionSeen)
+        lock.lock(messageId);
+        try
         {
-            counter = findCounter(messageId);
-            tooMany = counter != null && counter.get() > maxRedeliveryCount;
-        }
 
-        if (tooMany || exceptionSeen)
-        {
-            try
+            if (!exceptionSeen)
             {
-                return deadLetterQueue.process(event);
+                counter = findCounter(messageId);
+                tooMany = counter != null && counter.get() > maxRedeliveryCount;
             }
-            catch (Exception ex)
+
+            if (tooMany || exceptionSeen)
             {
-                logger.info("Exception thrown from failed message processing for message " + messageId, ex);
+                try
+                {
+                    return deadLetterQueue.process(event);
+                }
+                catch (Exception ex)
+                {
+                    logger.info("Exception thrown from failed message processing for message " + messageId, ex);
+                }
+                return null;
             }
-            return null;
-        }
 
-        try
-        {
-            MuleEvent returnEvent = processNext(event);
-            counter = findCounter(messageId);
-            if (counter != null)
+            try
             {
-                resetCounter(messageId);
+                MuleEvent returnEvent = processNext(event);
+                counter = findCounter(messageId);
+                if (counter != null)
+                {
+                    resetCounter(messageId);
+                }
+                return returnEvent;
             }
-            return returnEvent;
+            catch (MuleException ex)
+            {
+                incrementCounter(messageId);
+                throw ex;
+            }
+            catch (RuntimeException ex)
+            {
+                incrementCounter(messageId);
+                throw ex;
+            }
         }
-        catch (MuleException ex)
+        finally
         {
-            incrementCounter(messageId);
-            throw ex;
+            lock.unlock(messageId);
         }
-        catch (RuntimeException ex)
-        {
-            incrementCounter(messageId);
-            throw ex;
-        }
+
     }
 
     private void resetCounter(String messageId) throws ObjectStoreException

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java (0 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,30 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import org.mule.api.lifecycle.Disposable;
+
+/**
+ * Interface to provide a locking mechanism to use in mule components
+ */
+public interface Lock<T> extends Disposable
+{
+
+    /*
+     * Gets a lock over the resource identified with lockId
+     */
+    void lock(T lockId);
+
+    /*
+     * Releases lock over the resource identified with lockId
+     */
+    void unlock(T lockId);
+    
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java (0 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,25 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+/**
+ * Factory for creating Lock instances.
+ *
+ * Default LockFactory can be override by modules using registry-bootstrap.
+ */
+public interface LockFactory<T>
+{
+
+    /**
+     * Creates a Lock for a given resource using the resource unique identifier.
+     */
+    Lock<T> createLock(String lockResourceName);
+    
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java (0 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,104 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Default implementation of the Lock interface. Useful for doing locking in a single mule instance.
+ */
+public class ServerLock<T> implements Lock<T>
+{
+    private Map<T, LockEntry> locks;
+    private Object acquireLock = new Object();
+
+    public ServerLock()
+    {
+        this.locks = new HashMap<T,LockEntry>();
+    }
+
+    public void lock(T key)
+    {
+        LockEntry lock;
+        synchronized (acquireLock)
+        {
+            if (this.locks.containsKey(key))
+            {
+                lock = this.locks.get(key);
+            }
+            else
+            {
+                lock = new LockEntry();
+                this.locks.put(key,lock);
+            }
+            lock.incrementLockCount();
+            acquireLock.notifyAll();
+        }
+        lock.lock();
+    }
+
+    public void unlock(T key)
+    {
+        synchronized (acquireLock)
+        {
+            LockEntry lock = this.locks.get(key);
+            if (lock != null)
+            {
+                lock.decrementLockCount();
+                if (!lock.hasPendingLocks())
+                {
+                    this.locks.remove(key);
+                }
+                lock.unlock();
+            }
+            acquireLock.notifyAll();
+        }
+    }
+
+    public static class LockEntry
+    {
+        private AtomicInteger lockCount  = new AtomicInteger(0);
+        private ReentrantLock lock = new ReentrantLock(true);
+
+        public void lock()
+        {
+            lock.lock();
+        }
+
+        public void incrementLockCount()
+        {
+            lockCount.incrementAndGet();
+        }
+
+        public void decrementLockCount()
+        {
+            lockCount.decrementAndGet();
+        }
+
+        public void unlock()
+        {
+            lock.unlock();
+        }
+
+        public boolean hasPendingLocks()
+        {
+            return lockCount.get() > 0;
+        }
+    }
+
+    @Override
+    public void dispose()
+    {
+        locks.clear();
+    }
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java (0 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,19 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+public class ServerLockFactory<T> implements LockFactory<T>
+{
+    @Override
+    public Lock<T> createLock(String lockResourceName)
+    {
+        return new ServerLock<T>();
+    }
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Modified: branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java (24785 => 24786)


--- branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java	2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -10,71 +10,97 @@
 
 package org.mule.processor;
 
-import static org.mockito.Matchers.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-
+import junit.framework.Assert;
 import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
 import org.mockito.Answers;
-import org.mockito.Mockito;
 import org.mockito.internal.verification.VerificationModeFactory;
-import org.mule.api.*;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mule.api.MuleContext;
+import org.mule.api.MuleEvent;
+import org.mule.api.MuleException;
+import org.mule.api.MuleMessage;
 import org.mule.api.config.MuleProperties;
 import org.mule.api.construct.FlowConstruct;
 import org.mule.api.processor.MessageProcessor;
 import org.mule.api.store.ObjectStore;
 import org.mule.api.store.ObjectStoreException;
 import org.mule.api.store.ObjectStoreManager;
-import org.mule.routing.MessageProcessorFilterPair;
 import org.mule.tck.junit4.AbstractMuleTestCase;
-
-import org.junit.Test;
-
-import junit.framework.Assert;
+import org.mule.tck.junit4.rule.SystemProperty;
 import org.mule.util.SerializationUtils;
+import org.mule.util.concurrent.Latch;
+import org.mule.util.lock.ServerLock;
+import org.mule.util.lock.ServerLockFactory;
 
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class IdempotentRedeliveryPolicyTestCase extends AbstractMuleTestCase
 {
 
     public static final String STRING_MESSAGE = "message";
-    public static final int MAX_REDELIVERY_COUNT = 1;
+    public static final int MAX_REDELIVERY_COUNT = 0;
     private MuleContext mockMuleContext = mock(MuleContext.class, Answers.RETURNS_DEEP_STUBS.get());
     private ObjectStoreManager mockObjectStoreManager = mock(ObjectStoreManager.class, Answers.RETURNS_DEEP_STUBS.get());
     private MessageProcessor mockFailingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
-    private MessageProcessorFilterPair mockDlqMessageProcessor = mock(MessageProcessorFilterPair.class, Answers.RETURNS_DEEP_STUBS.get());
+    private MessageProcessor mockWaitingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
+    private MessageProcessor mockDlqMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
     private MuleMessage message = mock(MuleMessage.class, Answers.RETURNS_DEEP_STUBS.get());
     private MuleEvent event = mock(MuleEvent.class, Answers.RETURNS_DEEP_STUBS.get());
+    private Latch waitLatch = new Latch();
+    private CountDownLatch waitingMessageProcessorExecutionLatch = new CountDownLatch(2);
+    private final IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
 
+    @Rule
+    public SystemProperty systemProperty = new SystemProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+
     @Before
     public void setUpTest() throws MuleException
     {
         when(mockFailingMessageProcessor.process(any(MuleEvent.class))).thenThrow(new RuntimeException("failing"));
-        System.setProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+        when(mockWaitingMessageProcessor.process(event)).thenAnswer(new Answer<MuleEvent>()
+        {
+            @Override
+            public MuleEvent answer(InvocationOnMock invocationOnMock) throws Throwable
+            {
+                waitingMessageProcessorExecutionLatch.countDown();
+                waitLatch.await(2000, TimeUnit.MILLISECONDS);
+                return mockFailingMessageProcessor.process((MuleEvent) invocationOnMock.getArguments()[0]);
+            }
+        });
+        when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_LOCK_FACTORY)).thenReturn(new ServerLockFactory());
+        when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
+        InMemoryObjectStore inMemoryObjectStore = new InMemoryObjectStore();
+        when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(inMemoryObjectStore);
+        when(event.getMessage()).thenReturn(message);
+        irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
+        irp.setUseSecureHash(true);
+        irp.setFlowConstruct(mock(FlowConstruct.class));
+        irp.setMuleContext(mockMuleContext);
+        irp.setListener(mockFailingMessageProcessor);
+        irp.setMessageProcessor(mockDlqMessageProcessor);
+
     }
 
     @Test
     public void messageDigestFailure() throws Exception
     {
-        Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
-        Mockito.when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(new InMemoryObjectStore());
-
-        IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
-        irp.setUseSecureHash(true);
-        irp.setMaxRedeliveryCount(1);
-        irp.setFlowConstruct(mock(FlowConstruct.class));
-        irp.setMuleContext(mockMuleContext);
+        when(message.getPayload()).thenReturn(new Object());
         irp.initialise();
-
-
-        when(message.getPayload()).thenReturn(new Object());
-
-        when(event.getMessage()).thenReturn(message);
         MuleEvent process = irp.process(event);
         Assert.assertNull(process);
     }
@@ -82,22 +108,42 @@
     @Test
     public void testMessageRedeliveryUsingMemory() throws Exception
     {
-        Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
-        Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new InMemoryObjectStore());
+        when(message.getPayload()).thenReturn(STRING_MESSAGE);
+        irp.initialise();
+        processUntilFailure();
+        verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+    }
 
-        IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
-        irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
-        irp.setUseSecureHash(true);
-        irp.setFlowConstruct(mock(FlowConstruct.class));
-        irp.setMuleContext(mockMuleContext);
-        irp.setListener(mockFailingMessageProcessor);
-        irp.setDeadLetterQueue(mockDlqMessageProcessor);
+    @Test
+    public void testMessageRedeliveryUsingSerializationStore() throws Exception
+    {
+        when(message.getPayload()).thenReturn(STRING_MESSAGE);
         irp.initialise();
+        processUntilFailure();
+        verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+    }
 
+    @Test
+    public void testThreadSafeObjectStoreUsage() throws Exception
+    {
         when(message.getPayload()).thenReturn(STRING_MESSAGE);
-        when(event.getMessage()).thenReturn(message);
+        irp.setListener(mockWaitingMessageProcessor);
+        irp.initialise();
 
-        for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)
+        ExecuteIrpThread firstIrpExecutionThread = new ExecuteIrpThread();
+        firstIrpExecutionThread.start();
+        ExecuteIrpThread threadCausingRedeliveryException = new ExecuteIrpThread();
+        threadCausingRedeliveryException.start();
+        waitingMessageProcessorExecutionLatch.await(5000, TimeUnit.MILLISECONDS);
+        waitLatch.release();
+        firstIrpExecutionThread.join();
+        threadCausingRedeliveryException.join();
+        verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+    }
+
+    private void processUntilFailure()
+    {
+        for (int i = 0; i < MAX_REDELIVERY_COUNT + 2; i++)
         {
             try
             {
@@ -107,28 +153,14 @@
             {
             }
         }
-        verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1));
     }
 
-    @Test
-    public void testMessageRedeliveryUsingSerializationStore() throws Exception
+    public class ExecuteIrpThread extends Thread
     {
-        Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
-        Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new SerializationObjectStore());
-
-        IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
-        irp.setUseSecureHash(true);
-        irp.setMaxRedeliveryCount(1);
-        irp.setFlowConstruct(mock(FlowConstruct.class));
-        irp.setMuleContext(mockMuleContext);
-        irp.setListener(mockFailingMessageProcessor);
-        irp.setDeadLetterQueue(mockDlqMessageProcessor);
-        irp.initialise();
-
-        when(message.getPayload()).thenReturn(STRING_MESSAGE);
-        when(event.getMessage()).thenReturn(message);
-
-        for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)
+        public Exception exception;
+        
+        @Override
+        public void run()
         {
             try
             {
@@ -136,15 +168,18 @@
             }
             catch (Exception e)
             {
+                exception = e;
             }
         }
-        verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1));
     }
+   
+    
 
     public static class SerializationObjectStore implements ObjectStore<AtomicInteger>
     {
 
         private Map<Serializable,Serializable> store = new HashMap<Serializable,Serializable>();
+        private ServerLock lockableObjectStore = new ServerLock();
 
         @Override
         public boolean contains(Serializable key) throws ObjectStoreException
@@ -177,11 +212,13 @@
         {
             return false;
         }
+
     }
 
     public static class InMemoryObjectStore implements ObjectStore<AtomicInteger>
     {
         private Map<Serializable,AtomicInteger> store = new HashMap<Serializable,AtomicInteger>();
+        private ServerLock lockableObjectStore = new ServerLock();
 
         @Override
         public boolean contains(Serializable key) throws ObjectStoreException
@@ -212,6 +249,7 @@
         {
             return false;
         }
+
     }
 
 }

Added: branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java (0 => 24786)


--- branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,90 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.tck.junit4.rule;
+
+import org.junit.rules.ExternalResource;
+
+/**
+ * Sets up a system property before a test and guaranties to tear it down
+ * afterward.
+ */
+public class SystemProperty extends ExternalResource
+{
+
+    private final String name;
+    private String value;
+    private boolean initialized;
+    private String oldValue;
+
+    public SystemProperty(String name)
+    {
+        this(name, null);
+    }
+
+    public SystemProperty(String name, String value)
+    {
+        this.name = name;
+        this.value = value;
+    }
+
+    @Override
+    protected void before() throws Throwable
+    {
+        if (initialized)
+        {
+            throw new IllegalArgumentException("System property was already initialized");
+        }
+
+        oldValue = System.setProperty(name, getValue());
+        initialized = true;
+    }
+
+    @Override
+    protected void after()
+    {
+        if (!initialized)
+        {
+            throw new IllegalArgumentException("System property was not initialized");
+        }
+
+        doCleanUp();
+        restoreOldValue();
+
+        initialized = false;
+    }
+
+    protected void restoreOldValue()
+    {
+        if (oldValue == null)
+        {
+            System.clearProperty(name);
+        }
+        else
+        {
+            System.setProperty(name, oldValue);
+        }
+    }
+
+    public String getName()
+    {
+        return name;
+    }
+
+    protected void doCleanUp()
+    {
+        // Nothing to do
+    };
+
+    public String getValue()
+    {
+        return value;
+    };
+}
Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java (0 => 24786)


--- branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,151 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import org.junit.Test;
+import org.mule.api.store.ObjectAlreadyExistsException;
+import org.mule.api.store.ObjectStore;
+import org.mule.api.store.ObjectStoreException;
+import org.mule.config.i18n.CoreMessages;
+import org.mule.tck.junit4.AbstractMuleTestCase;
+import org.mule.util.concurrent.Latch;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class ServerLockTestCase extends AbstractMuleTestCase
+{
+    public static final int THREAD_COUNT = 100;
+    public static final int ITERATIONS_PER_THREAD = 100;
+    private Latch threadStartLatch = new Latch();
+    private String sharedKeyA = "A";
+    private String sharedKeyB = "B";
+    private ServerLock<String> serverLock = new ServerLock<String>();
+    private InMemoryObjectStore objectStore  = new InMemoryObjectStore();
+
+    @Test
+    public void testHighConcurrency() throws Exception
+    {
+        List<Thread> threads = new ArrayList<Thread>(THREAD_COUNT);
+        for (int i = 0; i < THREAD_COUNT; i++)
+        {
+            IncrementKeyValueThread incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyA);
+            threads.add(incrementKeyValueThread);
+            incrementKeyValueThread.start();
+            incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyB);
+            threads.add(incrementKeyValueThread);
+            incrementKeyValueThread.start();
+        }
+        threadStartLatch.release();
+        for (Thread thread : threads)
+        {
+            thread.join();
+        }
+        assertThat(objectStore.retrieve(sharedKeyA), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+        assertThat(objectStore.retrieve(sharedKeyB), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+    }
+    
+    public class IncrementKeyValueThread extends Thread
+    {
+        private String key;
+
+        public IncrementKeyValueThread(String key)
+        {
+            super("Thread-" + key);
+            this.key = key;
+        }
+
+        @Override
+        public void run()
+        {
+            try
+            {
+                threadStartLatch.await(5000, TimeUnit.MILLISECONDS);
+                for (int i = 0; i < ITERATIONS_PER_THREAD; i ++)
+                {
+                    if (Thread.interrupted())
+                    {
+                        break;
+                    }
+                    serverLock.lock(key);
+                    try
+                    {
+                        Integer value;
+                        if (objectStore.contains(key))
+                        {
+                            value = objectStore.retrieve(key);
+                            objectStore.remove(key);
+                        }
+                        else
+                        {
+                            value = 0;
+                        }
+                        objectStore.store(key,value + 1);
+                    }
+                    finally
+                    {
+                        serverLock.unlock(key);
+                    }
+                }
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public static class InMemoryObjectStore implements ObjectStore<Integer>
+    {
+        private Map<Serializable,Integer> store = new HashMap<Serializable,Integer>();
+
+        @Override
+        public boolean contains(Serializable key) throws ObjectStoreException
+        {
+            return store.containsKey(key);
+        }
+
+        @Override
+        public void store(Serializable key, Integer value) throws ObjectStoreException
+        {
+            if (store.containsKey(key))
+            {
+                throw new ObjectAlreadyExistsException(CoreMessages.createStaticMessage(""));
+            }
+            store.put(key,value);
+        }
+
+        @Override
+        public Integer retrieve(Serializable key) throws ObjectStoreException
+        {
+            return store.get(key);
+        }
+
+        @Override
+        public Integer remove(Serializable key) throws ObjectStoreException
+        {
+            return store.remove(key);
+        }
+
+        @Override
+        public boolean isPersistent()
+        {
+            return false;
+        }
+    }
+
+}
Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Modified: branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml (24785 => 24786)


--- branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml	2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml	2012-08-21 05:18:14 UTC (rev 24786)
@@ -86,6 +86,8 @@
 
     <bean name="_defaultRetryPolicyTemplate" class="org.mule.retry.policies.NoRetryPolicyTemplate"/>
 
+    <bean name="_muleLockFactory" class="org.mule.util.lock.ServerLockFactory"/>
+
     <!-- Default Transformers are now loaded from META-INF/services/org/mule/config/registry-bootstrap.properties so that
     the transformers will be available even when using the TransientRegistry only -->
 

To unsubscribe from this list please visit:

http://xircles.codehaus.org/manage_email




Reply | Threaded
Open this post in threaded view
|

Re: [mule-scm] [mule][24786] branches/mule-3.2.x: MULE-6403 - adding basic locking mechanism and applying it to IdempotentRedeliveryPolicy

Pablo Kraan
Adding feedback from David Dossot:

>>I can't post to this mailing list :( otherwise I would have added:
>> - What happens if the lock holder dies? Is the lock eternal or does it time out?


On Tue, Aug 21, 2012 at 4:12 PM, Pablo Kraan <[hidden email]> wrote:

Some comments:


Lock: I think the javadoc needs more information. What happens when there is no object with the given key? what happens if the caller to the release method does not have the lock on passed key?

LockFactory: is not clear to me what is the purpose of the lockResourceName

ServerLock: acquireLock field must be final

Remove redundant usages of "this" keyword

Fix imports order


Pablo K


On Tue, Aug 21, 2012 at 2:25 PM, Pablo La Greca <[hidden email]> wrote:
LockFactory was added to org.mule.config.builders.DefaultsConfigurationBuilder  as suggested.

IdempotentMessageFilter relies on the fact that storing twice the same key in an object store will fail. So it's not the same case.

On Tue, Aug 21, 2012 at 9:22 AM, Daniel Feist <[hidden email]> wrote:
Does IdempotentMessageFilter need the same?

Dan

On Aug 21, 2012, at 2:18 AM, [hidden email] wrote:

Revision
24786
Author
pablo.lagreca
Date
2012-08-21 00:18:14 -0500 (Tue, 21 Aug 2012)

Log Message

MULE-6403 - adding basic locking mechanism and applying it to IdempotentRedeliveryPolicy

Modified Paths

Added Paths

Diff

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java (24785 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java	2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -148,6 +148,7 @@
     public static final String OBJECT_DEFAULT_RETRY_POLICY_TEMPLATE = "_defaultRetryPolicyTemplate";
     public static final String OBJECT_MULE_CONFIGURATION = "_muleConfiguration";
     public static final String OBJECT_MULE_NAMESPACE_MANAGER = "_muleNamespaceManager";
+    public static final String OBJECT_LOCK_FACTORY = "_muleLockFactory";
 
     // Not currently used as these need to be instance variables of the MuleContext.
     public static final String OBJECT_WORK_MANAGER = "_muleWorkManager";

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java (24785 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java	2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -49,7 +49,7 @@
     @Override
     public void initialise() throws InitialisationException
     {
-        if (maxRedeliveryCount < 1)
+        if (maxRedeliveryCount < 0)
         {
             throw new InitialisationException(
                 CoreMessages.initialisationFailure(

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java (24785 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java	2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -25,10 +25,13 @@
 import org.mule.transformer.simple.ObjectToByteArray;
 
 import java.io.InputStream;
+import java.io.Serializable;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.mule.util.lock.Lock;
+import org.mule.util.lock.LockFactory;
 import org.mule.util.store.ObjectStorePartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,6 +53,7 @@
     private String messageDigestAlgorithm;
     private String idExpression;
     private ObjectStore<AtomicInteger> store;
+    private Lock<Serializable> lock;
 
     @Override
     public void initialise() throws InitialisationException
@@ -95,6 +99,11 @@
             }
         }
 
+        String appName = muleContext.getConfiguration().getId();
+        String flowName = flowConstruct.getName();
+        String idrId = String.format("%s-%s-%s",appName,flowName,"idr");
+        lock = ((LockFactory<Serializable>)muleContext.getRegistry().get(MuleProperties.OBJECT_LOCK_FACTORY)).createLock(idrId);
+
         store = createStore();
     }
 
@@ -165,45 +174,55 @@
             exceptionSeen = true;
         }
 
-        if (!exceptionSeen)
+        lock.lock(messageId);
+        try
         {
-            counter = findCounter(messageId);
-            tooMany = counter != null && counter.get() > maxRedeliveryCount;
-        }
 
-        if (tooMany || exceptionSeen)
-        {
-            try
+            if (!exceptionSeen)
             {
-                return deadLetterQueue.process(event);
+                counter = findCounter(messageId);
+                tooMany = counter != null && counter.get() > maxRedeliveryCount;
             }
-            catch (Exception ex)
+
+            if (tooMany || exceptionSeen)
             {
-                logger.info("Exception thrown from failed message processing for message " + messageId, ex);
+                try
+                {
+                    return deadLetterQueue.process(event);
+                }
+                catch (Exception ex)
+                {
+                    logger.info("Exception thrown from failed message processing for message " + messageId, ex);
+                }
+                return null;
             }
-            return null;
-        }
 
-        try
-        {
-            MuleEvent returnEvent = processNext(event);
-            counter = findCounter(messageId);
-            if (counter != null)
+            try
             {
-                resetCounter(messageId);
+                MuleEvent returnEvent = processNext(event);
+                counter = findCounter(messageId);
+                if (counter != null)
+                {
+                    resetCounter(messageId);
+                }
+                return returnEvent;
             }
-            return returnEvent;
+            catch (MuleException ex)
+            {
+                incrementCounter(messageId);
+                throw ex;
+            }
+            catch (RuntimeException ex)
+            {
+                incrementCounter(messageId);
+                throw ex;
+            }
         }
-        catch (MuleException ex)
+        finally
         {
-            incrementCounter(messageId);
-            throw ex;
+            lock.unlock(messageId);
         }
-        catch (RuntimeException ex)
-        {
-            incrementCounter(messageId);
-            throw ex;
-        }
+
     }
 
     private void resetCounter(String messageId) throws ObjectStoreException

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java (0 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,30 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import org.mule.api.lifecycle.Disposable;
+
+/**
+ * Interface to provide a locking mechanism to use in mule components
+ */
+public interface Lock<T> extends Disposable
+{
+
+    /*
+     * Gets a lock over the resource identified with lockId
+     */
+    void lock(T lockId);
+
+    /*
+     * Releases lock over the resource identified with lockId
+     */
+    void unlock(T lockId);
+    
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java (0 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,25 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+/**
+ * Factory for creating Lock instances.
+ *
+ * Default LockFactory can be override by modules using registry-bootstrap.
+ */
+public interface LockFactory<T>
+{
+
+    /**
+     * Creates a Lock for a given resource using the resource unique identifier.
+     */
+    Lock<T> createLock(String lockResourceName);
+    
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java (0 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,104 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Default implementation of the Lock interface. Useful for doing locking in a single mule instance.
+ */
+public class ServerLock<T> implements Lock<T>
+{
+    private Map<T, LockEntry> locks;
+    private Object acquireLock = new Object();
+
+    public ServerLock()
+    {
+        this.locks = new HashMap<T,LockEntry>();
+    }
+
+    public void lock(T key)
+    {
+        LockEntry lock;
+        synchronized (acquireLock)
+        {
+            if (this.locks.containsKey(key))
+            {
+                lock = this.locks.get(key);
+            }
+            else
+            {
+                lock = new LockEntry();
+                this.locks.put(key,lock);
+            }
+            lock.incrementLockCount();
+            acquireLock.notifyAll();
+        }
+        lock.lock();
+    }
+
+    public void unlock(T key)
+    {
+        synchronized (acquireLock)
+        {
+            LockEntry lock = this.locks.get(key);
+            if (lock != null)
+            {
+                lock.decrementLockCount();
+                if (!lock.hasPendingLocks())
+                {
+                    this.locks.remove(key);
+                }
+                lock.unlock();
+            }
+            acquireLock.notifyAll();
+        }
+    }
+
+    public static class LockEntry
+    {
+        private AtomicInteger lockCount  = new AtomicInteger(0);
+        private ReentrantLock lock = new ReentrantLock(true);
+
+        public void lock()
+        {
+            lock.lock();
+        }
+
+        public void incrementLockCount()
+        {
+            lockCount.incrementAndGet();
+        }
+
+        public void decrementLockCount()
+        {
+            lockCount.decrementAndGet();
+        }
+
+        public void unlock()
+        {
+            lock.unlock();
+        }
+
+        public boolean hasPendingLocks()
+        {
+            return lockCount.get() > 0;
+        }
+    }
+
+    @Override
+    public void dispose()
+    {
+        locks.clear();
+    }
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java (0 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,19 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+public class ServerLockFactory<T> implements LockFactory<T>
+{
+    @Override
+    public Lock<T> createLock(String lockResourceName)
+    {
+        return new ServerLock<T>();
+    }
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Modified: branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java (24785 => 24786)


--- branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java	2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -10,71 +10,97 @@
 
 package org.mule.processor;
 
-import static org.mockito.Matchers.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-
+import junit.framework.Assert;
 import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
 import org.mockito.Answers;
-import org.mockito.Mockito;
 import org.mockito.internal.verification.VerificationModeFactory;
-import org.mule.api.*;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mule.api.MuleContext;
+import org.mule.api.MuleEvent;
+import org.mule.api.MuleException;
+import org.mule.api.MuleMessage;
 import org.mule.api.config.MuleProperties;
 import org.mule.api.construct.FlowConstruct;
 import org.mule.api.processor.MessageProcessor;
 import org.mule.api.store.ObjectStore;
 import org.mule.api.store.ObjectStoreException;
 import org.mule.api.store.ObjectStoreManager;
-import org.mule.routing.MessageProcessorFilterPair;
 import org.mule.tck.junit4.AbstractMuleTestCase;
-
-import org.junit.Test;
-
-import junit.framework.Assert;
+import org.mule.tck.junit4.rule.SystemProperty;
 import org.mule.util.SerializationUtils;
+import org.mule.util.concurrent.Latch;
+import org.mule.util.lock.ServerLock;
+import org.mule.util.lock.ServerLockFactory;
 
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class IdempotentRedeliveryPolicyTestCase extends AbstractMuleTestCase
 {
 
     public static final String STRING_MESSAGE = "message";
-    public static final int MAX_REDELIVERY_COUNT = 1;
+    public static final int MAX_REDELIVERY_COUNT = 0;
     private MuleContext mockMuleContext = mock(MuleContext.class, Answers.RETURNS_DEEP_STUBS.get());
     private ObjectStoreManager mockObjectStoreManager = mock(ObjectStoreManager.class, Answers.RETURNS_DEEP_STUBS.get());
     private MessageProcessor mockFailingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
-    private MessageProcessorFilterPair mockDlqMessageProcessor = mock(MessageProcessorFilterPair.class, Answers.RETURNS_DEEP_STUBS.get());
+    private MessageProcessor mockWaitingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
+    private MessageProcessor mockDlqMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
     private MuleMessage message = mock(MuleMessage.class, Answers.RETURNS_DEEP_STUBS.get());
     private MuleEvent event = mock(MuleEvent.class, Answers.RETURNS_DEEP_STUBS.get());
+    private Latch waitLatch = new Latch();
+    private CountDownLatch waitingMessageProcessorExecutionLatch = new CountDownLatch(2);
+    private final IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
 
+    @Rule
+    public SystemProperty systemProperty = new SystemProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+
     @Before
     public void setUpTest() throws MuleException
     {
         when(mockFailingMessageProcessor.process(any(MuleEvent.class))).thenThrow(new RuntimeException("failing"));
-        System.setProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+        when(mockWaitingMessageProcessor.process(event)).thenAnswer(new Answer<MuleEvent>()
+        {
+            @Override
+            public MuleEvent answer(InvocationOnMock invocationOnMock) throws Throwable
+            {
+                waitingMessageProcessorExecutionLatch.countDown();
+                waitLatch.await(2000, TimeUnit.MILLISECONDS);
+                return mockFailingMessageProcessor.process((MuleEvent) invocationOnMock.getArguments()[0]);
+            }
+        });
+        when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_LOCK_FACTORY)).thenReturn(new ServerLockFactory());
+        when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
+        InMemoryObjectStore inMemoryObjectStore = new InMemoryObjectStore();
+        when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(inMemoryObjectStore);
+        when(event.getMessage()).thenReturn(message);
+        irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
+        irp.setUseSecureHash(true);
+        irp.setFlowConstruct(mock(FlowConstruct.class));
+        irp.setMuleContext(mockMuleContext);
+        irp.setListener(mockFailingMessageProcessor);
+        irp.setMessageProcessor(mockDlqMessageProcessor);
+
     }
 
     @Test
     public void messageDigestFailure() throws Exception
     {
-        Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
-        Mockito.when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(new InMemoryObjectStore());
-
-        IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
-        irp.setUseSecureHash(true);
-        irp.setMaxRedeliveryCount(1);
-        irp.setFlowConstruct(mock(FlowConstruct.class));
-        irp.setMuleContext(mockMuleContext);
+        when(message.getPayload()).thenReturn(new Object());
         irp.initialise();
-
-
-        when(message.getPayload()).thenReturn(new Object());
-
-        when(event.getMessage()).thenReturn(message);
         MuleEvent process = irp.process(event);
         Assert.assertNull(process);
     }
@@ -82,22 +108,42 @@
     @Test
     public void testMessageRedeliveryUsingMemory() throws Exception
     {
-        Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
-        Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new InMemoryObjectStore());
+        when(message.getPayload()).thenReturn(STRING_MESSAGE);
+        irp.initialise();
+        processUntilFailure();
+        verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+    }
 
-        IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
-        irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
-        irp.setUseSecureHash(true);
-        irp.setFlowConstruct(mock(FlowConstruct.class));
-        irp.setMuleContext(mockMuleContext);
-        irp.setListener(mockFailingMessageProcessor);
-        irp.setDeadLetterQueue(mockDlqMessageProcessor);
+    @Test
+    public void testMessageRedeliveryUsingSerializationStore() throws Exception
+    {
+        when(message.getPayload()).thenReturn(STRING_MESSAGE);
         irp.initialise();
+        processUntilFailure();
+        verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+    }
 
+    @Test
+    public void testThreadSafeObjectStoreUsage() throws Exception
+    {
         when(message.getPayload()).thenReturn(STRING_MESSAGE);
-        when(event.getMessage()).thenReturn(message);
+        irp.setListener(mockWaitingMessageProcessor);
+        irp.initialise();
 
-        for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)
+        ExecuteIrpThread firstIrpExecutionThread = new ExecuteIrpThread();
+        firstIrpExecutionThread.start();
+        ExecuteIrpThread threadCausingRedeliveryException = new ExecuteIrpThread();
+        threadCausingRedeliveryException.start();
+        waitingMessageProcessorExecutionLatch.await(5000, TimeUnit.MILLISECONDS);
+        waitLatch.release();
+        firstIrpExecutionThread.join();
+        threadCausingRedeliveryException.join();
+        verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+    }
+
+    private void processUntilFailure()
+    {
+        for (int i = 0; i < MAX_REDELIVERY_COUNT + 2; i++)
         {
             try
             {
@@ -107,28 +153,14 @@
             {
             }
         }
-        verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1));
     }
 
-    @Test
-    public void testMessageRedeliveryUsingSerializationStore() throws Exception
+    public class ExecuteIrpThread extends Thread
     {
-        Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
-        Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new SerializationObjectStore());
-
-        IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
-        irp.setUseSecureHash(true);
-        irp.setMaxRedeliveryCount(1);
-        irp.setFlowConstruct(mock(FlowConstruct.class));
-        irp.setMuleContext(mockMuleContext);
-        irp.setListener(mockFailingMessageProcessor);
-        irp.setDeadLetterQueue(mockDlqMessageProcessor);
-        irp.initialise();
-
-        when(message.getPayload()).thenReturn(STRING_MESSAGE);
-        when(event.getMessage()).thenReturn(message);
-
-        for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)
+        public Exception exception;
+        
+        @Override
+        public void run()
         {
             try
             {
@@ -136,15 +168,18 @@
             }
             catch (Exception e)
             {
+                exception = e;
             }
         }
-        verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1));
     }
+   
+    
 
     public static class SerializationObjectStore implements ObjectStore<AtomicInteger>
     {
 
         private Map<Serializable,Serializable> store = new HashMap<Serializable,Serializable>();
+        private ServerLock lockableObjectStore = new ServerLock();
 
         @Override
         public boolean contains(Serializable key) throws ObjectStoreException
@@ -177,11 +212,13 @@
         {
             return false;
         }
+
     }
 
     public static class InMemoryObjectStore implements ObjectStore<AtomicInteger>
     {
         private Map<Serializable,AtomicInteger> store = new HashMap<Serializable,AtomicInteger>();
+        private ServerLock lockableObjectStore = new ServerLock();
 
         @Override
         public boolean contains(Serializable key) throws ObjectStoreException
@@ -212,6 +249,7 @@
         {
             return false;
         }
+
     }
 
 }

Added: branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java (0 => 24786)


--- branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,90 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.tck.junit4.rule;
+
+import org.junit.rules.ExternalResource;
+
+/**
+ * Sets up a system property before a test and guaranties to tear it down
+ * afterward.
+ */
+public class SystemProperty extends ExternalResource
+{
+
+    private final String name;
+    private String value;
+    private boolean initialized;
+    private String oldValue;
+
+    public SystemProperty(String name)
+    {
+        this(name, null);
+    }
+
+    public SystemProperty(String name, String value)
+    {
+        this.name = name;
+        this.value = value;
+    }
+
+    @Override
+    protected void before() throws Throwable
+    {
+        if (initialized)
+        {
+            throw new IllegalArgumentException("System property was already initialized");
+        }
+
+        oldValue = System.setProperty(name, getValue());
+        initialized = true;
+    }
+
+    @Override
+    protected void after()
+    {
+        if (!initialized)
+        {
+            throw new IllegalArgumentException("System property was not initialized");
+        }
+
+        doCleanUp();
+        restoreOldValue();
+
+        initialized = false;
+    }
+
+    protected void restoreOldValue()
+    {
+        if (oldValue == null)
+        {
+            System.clearProperty(name);
+        }
+        else
+        {
+            System.setProperty(name, oldValue);
+        }
+    }
+
+    public String getName()
+    {
+        return name;
+    }
+
+    protected void doCleanUp()
+    {
+        // Nothing to do
+    };
+
+    public String getValue()
+    {
+        return value;
+    };
+}
Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java (0 => 24786)


--- branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,151 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import org.junit.Test;
+import org.mule.api.store.ObjectAlreadyExistsException;
+import org.mule.api.store.ObjectStore;
+import org.mule.api.store.ObjectStoreException;
+import org.mule.config.i18n.CoreMessages;
+import org.mule.tck.junit4.AbstractMuleTestCase;
+import org.mule.util.concurrent.Latch;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class ServerLockTestCase extends AbstractMuleTestCase
+{
+    public static final int THREAD_COUNT = 100;
+    public static final int ITERATIONS_PER_THREAD = 100;
+    private Latch threadStartLatch = new Latch();
+    private String sharedKeyA = "A";
+    private String sharedKeyB = "B";
+    private ServerLock<String> serverLock = new ServerLock<String>();
+    private InMemoryObjectStore objectStore  = new InMemoryObjectStore();
+
+    @Test
+    public void testHighConcurrency() throws Exception
+    {
+        List<Thread> threads = new ArrayList<Thread>(THREAD_COUNT);
+        for (int i = 0; i < THREAD_COUNT; i++)
+        {
+            IncrementKeyValueThread incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyA);
+            threads.add(incrementKeyValueThread);
+            incrementKeyValueThread.start();
+            incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyB);
+            threads.add(incrementKeyValueThread);
+            incrementKeyValueThread.start();
+        }
+        threadStartLatch.release();
+        for (Thread thread : threads)
+        {
+            thread.join();
+        }
+        assertThat(objectStore.retrieve(sharedKeyA), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+        assertThat(objectStore.retrieve(sharedKeyB), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+    }
+    
+    public class IncrementKeyValueThread extends Thread
+    {
+        private String key;
+
+        public IncrementKeyValueThread(String key)
+        {
+            super("Thread-" + key);
+            this.key = key;
+        }
+
+        @Override
+        public void run()
+        {
+            try
+            {
+                threadStartLatch.await(5000, TimeUnit.MILLISECONDS);
+                for (int i = 0; i < ITERATIONS_PER_THREAD; i ++)
+                {
+                    if (Thread.interrupted())
+                    {
+                        break;
+                    }
+                    serverLock.lock(key);
+                    try
+                    {
+                        Integer value;
+                        if (objectStore.contains(key))
+                        {
+                            value = objectStore.retrieve(key);
+                            objectStore.remove(key);
+                        }
+                        else
+                        {
+                            value = 0;
+                        }
+                        objectStore.store(key,value + 1);
+                    }
+                    finally
+                    {
+                        serverLock.unlock(key);
+                    }
+                }
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public static class InMemoryObjectStore implements ObjectStore<Integer>
+    {
+        private Map<Serializable,Integer> store = new HashMap<Serializable,Integer>();
+
+        @Override
+        public boolean contains(Serializable key) throws ObjectStoreException
+        {
+            return store.containsKey(key);
+        }
+
+        @Override
+        public void store(Serializable key, Integer value) throws ObjectStoreException
+        {
+            if (store.containsKey(key))
+            {
+                throw new ObjectAlreadyExistsException(CoreMessages.createStaticMessage(""));
+            }
+            store.put(key,value);
+        }
+
+        @Override
+        public Integer retrieve(Serializable key) throws ObjectStoreException
+        {
+            return store.get(key);
+        }
+
+        @Override
+        public Integer remove(Serializable key) throws ObjectStoreException
+        {
+            return store.remove(key);
+        }
+
+        @Override
+        public boolean isPersistent()
+        {
+            return false;
+        }
+    }
+
+}
Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Modified: branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml (24785 => 24786)


--- branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml	2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml	2012-08-21 05:18:14 UTC (rev 24786)
@@ -86,6 +86,8 @@
 
     <bean name="_defaultRetryPolicyTemplate" class="org.mule.retry.policies.NoRetryPolicyTemplate"/>
 
+    <bean name="_muleLockFactory" class="org.mule.util.lock.ServerLockFactory"/>
+
     <!-- Default Transformers are now loaded from META-INF/services/org/mule/config/registry-bootstrap.properties so that
     the transformers will be available even when using the TransientRegistry only -->
 

To unsubscribe from this list please visit:

http://xircles.codehaus.org/manage_email





Reply | Threaded
Open this post in threaded view
|

Re: [mule-scm] [mule][24786] branches/mule-3.2.x: MULE-6403 - adding basic locking mechanism and applying it to IdempotentRedeliveryPolicy

Pablo La Greca
"What happens if the lock holder dies? Is the lock eternal or does it time out?" -> If it's in a cluster then it will be release, if it's in a single mule instance then lock client must ensure proper usage through the use of finally block.

"Lock: I think the javadoc needs more information. What happens when there is no object with the given key? what happens if the caller to the release method does not have the lock on passed key?" -> Will improve javadoc if required but there's no information around non existent key since it's not an object store. It's just a locking mechanism were a lock can be identified using any object.

"LockFactory: is not clear to me what is the purpose of the lockResourceName" -> Will improve javadoc

"ServerLock: acquireLock field must be final" -> Will do

"Remove redundant usages of "this" keyword" -> Don't think it's redundant. It's a matter of preference of usage.

"Fix imports order" -> Will do.



On Tue, Aug 21, 2012 at 4:19 PM, Pablo Kraan <[hidden email]> wrote:
Adding feedback from David Dossot:

>>I can't post to this mailing list :( otherwise I would have added:
>> - What happens if the lock holder dies? Is the lock eternal or does it time out?


On Tue, Aug 21, 2012 at 4:12 PM, Pablo Kraan <[hidden email]> wrote:

Some comments:


Lock: I think the javadoc needs more information. What happens when there is no object with the given key? what happens if the caller to the release method does not have the lock on passed key?

LockFactory: is not clear to me what is the purpose of the lockResourceName

ServerLock: acquireLock field must be final

Remove redundant usages of "this" keyword

Fix imports order


Pablo K


On Tue, Aug 21, 2012 at 2:25 PM, Pablo La Greca <[hidden email]> wrote:
LockFactory was added to org.mule.config.builders.DefaultsConfigurationBuilder  as suggested.

IdempotentMessageFilter relies on the fact that storing twice the same key in an object store will fail. So it's not the same case.

On Tue, Aug 21, 2012 at 9:22 AM, Daniel Feist <[hidden email]> wrote:
Does IdempotentMessageFilter need the same?

Dan

On Aug 21, 2012, at 2:18 AM, [hidden email] wrote:

Revision
24786
Author
pablo.lagreca
Date
2012-08-21 00:18:14 -0500 (Tue, 21 Aug 2012)

Log Message

MULE-6403 - adding basic locking mechanism and applying it to IdempotentRedeliveryPolicy

Modified Paths

Added Paths

Diff

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java (24785 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java	2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -148,6 +148,7 @@
     public static final String OBJECT_DEFAULT_RETRY_POLICY_TEMPLATE = "_defaultRetryPolicyTemplate";
     public static final String OBJECT_MULE_CONFIGURATION = "_muleConfiguration";
     public static final String OBJECT_MULE_NAMESPACE_MANAGER = "_muleNamespaceManager";
+    public static final String OBJECT_LOCK_FACTORY = "_muleLockFactory";
 
     // Not currently used as these need to be instance variables of the MuleContext.
     public static final String OBJECT_WORK_MANAGER = "_muleWorkManager";

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java (24785 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java	2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -49,7 +49,7 @@
     @Override
     public void initialise() throws InitialisationException
     {
-        if (maxRedeliveryCount < 1)
+        if (maxRedeliveryCount < 0)
         {
             throw new InitialisationException(
                 CoreMessages.initialisationFailure(

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java (24785 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java	2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -25,10 +25,13 @@
 import org.mule.transformer.simple.ObjectToByteArray;
 
 import java.io.InputStream;
+import java.io.Serializable;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.mule.util.lock.Lock;
+import org.mule.util.lock.LockFactory;
 import org.mule.util.store.ObjectStorePartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,6 +53,7 @@
     private String messageDigestAlgorithm;
     private String idExpression;
     private ObjectStore<AtomicInteger> store;
+    private Lock<Serializable> lock;
 
     @Override
     public void initialise() throws InitialisationException
@@ -95,6 +99,11 @@
             }
         }
 
+        String appName = muleContext.getConfiguration().getId();
+        String flowName = flowConstruct.getName();
+        String idrId = String.format("%s-%s-%s",appName,flowName,"idr");
+        lock = ((LockFactory<Serializable>)muleContext.getRegistry().get(MuleProperties.OBJECT_LOCK_FACTORY)).createLock(idrId);
+
         store = createStore();
     }
 
@@ -165,45 +174,55 @@
             exceptionSeen = true;
         }
 
-        if (!exceptionSeen)
+        lock.lock(messageId);
+        try
         {
-            counter = findCounter(messageId);
-            tooMany = counter != null && counter.get() > maxRedeliveryCount;
-        }
 
-        if (tooMany || exceptionSeen)
-        {
-            try
+            if (!exceptionSeen)
             {
-                return deadLetterQueue.process(event);
+                counter = findCounter(messageId);
+                tooMany = counter != null && counter.get() > maxRedeliveryCount;
             }
-            catch (Exception ex)
+
+            if (tooMany || exceptionSeen)
             {
-                logger.info("Exception thrown from failed message processing for message " + messageId, ex);
+                try
+                {
+                    return deadLetterQueue.process(event);
+                }
+                catch (Exception ex)
+                {
+                    logger.info("Exception thrown from failed message processing for message " + messageId, ex);
+                }
+                return null;
             }
-            return null;
-        }
 
-        try
-        {
-            MuleEvent returnEvent = processNext(event);
-            counter = findCounter(messageId);
-            if (counter != null)
+            try
             {
-                resetCounter(messageId);
+                MuleEvent returnEvent = processNext(event);
+                counter = findCounter(messageId);
+                if (counter != null)
+                {
+                    resetCounter(messageId);
+                }
+                return returnEvent;
             }
-            return returnEvent;
+            catch (MuleException ex)
+            {
+                incrementCounter(messageId);
+                throw ex;
+            }
+            catch (RuntimeException ex)
+            {
+                incrementCounter(messageId);
+                throw ex;
+            }
         }
-        catch (MuleException ex)
+        finally
         {
-            incrementCounter(messageId);
-            throw ex;
+            lock.unlock(messageId);
         }
-        catch (RuntimeException ex)
-        {
-            incrementCounter(messageId);
-            throw ex;
-        }
+
     }
 
     private void resetCounter(String messageId) throws ObjectStoreException

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java (0 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,30 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import org.mule.api.lifecycle.Disposable;
+
+/**
+ * Interface to provide a locking mechanism to use in mule components
+ */
+public interface Lock<T> extends Disposable
+{
+
+    /*
+     * Gets a lock over the resource identified with lockId
+     */
+    void lock(T lockId);
+
+    /*
+     * Releases lock over the resource identified with lockId
+     */
+    void unlock(T lockId);
+    
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/Lock.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java (0 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,25 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+/**
+ * Factory for creating Lock instances.
+ *
+ * Default LockFactory can be override by modules using registry-bootstrap.
+ */
+public interface LockFactory<T>
+{
+
+    /**
+     * Creates a Lock for a given resource using the resource unique identifier.
+     */
+    Lock<T> createLock(String lockResourceName);
+    
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/LockFactory.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java (0 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,104 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Default implementation of the Lock interface. Useful for doing locking in a single mule instance.
+ */
+public class ServerLock<T> implements Lock<T>
+{
+    private Map<T, LockEntry> locks;
+    private Object acquireLock = new Object();
+
+    public ServerLock()
+    {
+        this.locks = new HashMap<T,LockEntry>();
+    }
+
+    public void lock(T key)
+    {
+        LockEntry lock;
+        synchronized (acquireLock)
+        {
+            if (this.locks.containsKey(key))
+            {
+                lock = this.locks.get(key);
+            }
+            else
+            {
+                lock = new LockEntry();
+                this.locks.put(key,lock);
+            }
+            lock.incrementLockCount();
+            acquireLock.notifyAll();
+        }
+        lock.lock();
+    }
+
+    public void unlock(T key)
+    {
+        synchronized (acquireLock)
+        {
+            LockEntry lock = this.locks.get(key);
+            if (lock != null)
+            {
+                lock.decrementLockCount();
+                if (!lock.hasPendingLocks())
+                {
+                    this.locks.remove(key);
+                }
+                lock.unlock();
+            }
+            acquireLock.notifyAll();
+        }
+    }
+
+    public static class LockEntry
+    {
+        private AtomicInteger lockCount  = new AtomicInteger(0);
+        private ReentrantLock lock = new ReentrantLock(true);
+
+        public void lock()
+        {
+            lock.lock();
+        }
+
+        public void incrementLockCount()
+        {
+            lockCount.incrementAndGet();
+        }
+
+        public void decrementLockCount()
+        {
+            lockCount.decrementAndGet();
+        }
+
+        public void unlock()
+        {
+            lock.unlock();
+        }
+
+        public boolean hasPendingLocks()
+        {
+            return lockCount.get() > 0;
+        }
+    }
+
+    @Override
+    public void dispose()
+    {
+        locks.clear();
+    }
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLock.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java (0 => 24786)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,19 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+public class ServerLockFactory<T> implements LockFactory<T>
+{
+    @Override
+    public Lock<T> createLock(String lockResourceName)
+    {
+        return new ServerLock<T>();
+    }
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/lock/ServerLockFactory.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Modified: branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java (24785 => 24786)


--- branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java	2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -10,71 +10,97 @@
 
 package org.mule.processor;
 
-import static org.mockito.Matchers.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-
+import junit.framework.Assert;
 import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
 import org.mockito.Answers;
-import org.mockito.Mockito;
 import org.mockito.internal.verification.VerificationModeFactory;
-import org.mule.api.*;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mule.api.MuleContext;
+import org.mule.api.MuleEvent;
+import org.mule.api.MuleException;
+import org.mule.api.MuleMessage;
 import org.mule.api.config.MuleProperties;
 import org.mule.api.construct.FlowConstruct;
 import org.mule.api.processor.MessageProcessor;
 import org.mule.api.store.ObjectStore;
 import org.mule.api.store.ObjectStoreException;
 import org.mule.api.store.ObjectStoreManager;
-import org.mule.routing.MessageProcessorFilterPair;
 import org.mule.tck.junit4.AbstractMuleTestCase;
-
-import org.junit.Test;
-
-import junit.framework.Assert;
+import org.mule.tck.junit4.rule.SystemProperty;
 import org.mule.util.SerializationUtils;
+import org.mule.util.concurrent.Latch;
+import org.mule.util.lock.ServerLock;
+import org.mule.util.lock.ServerLockFactory;
 
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class IdempotentRedeliveryPolicyTestCase extends AbstractMuleTestCase
 {
 
     public static final String STRING_MESSAGE = "message";
-    public static final int MAX_REDELIVERY_COUNT = 1;
+    public static final int MAX_REDELIVERY_COUNT = 0;
     private MuleContext mockMuleContext = mock(MuleContext.class, Answers.RETURNS_DEEP_STUBS.get());
     private ObjectStoreManager mockObjectStoreManager = mock(ObjectStoreManager.class, Answers.RETURNS_DEEP_STUBS.get());
     private MessageProcessor mockFailingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
-    private MessageProcessorFilterPair mockDlqMessageProcessor = mock(MessageProcessorFilterPair.class, Answers.RETURNS_DEEP_STUBS.get());
+    private MessageProcessor mockWaitingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
+    private MessageProcessor mockDlqMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
     private MuleMessage message = mock(MuleMessage.class, Answers.RETURNS_DEEP_STUBS.get());
     private MuleEvent event = mock(MuleEvent.class, Answers.RETURNS_DEEP_STUBS.get());
+    private Latch waitLatch = new Latch();
+    private CountDownLatch waitingMessageProcessorExecutionLatch = new CountDownLatch(2);
+    private final IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
 
+    @Rule
+    public SystemProperty systemProperty = new SystemProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+
     @Before
     public void setUpTest() throws MuleException
     {
         when(mockFailingMessageProcessor.process(any(MuleEvent.class))).thenThrow(new RuntimeException("failing"));
-        System.setProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+        when(mockWaitingMessageProcessor.process(event)).thenAnswer(new Answer<MuleEvent>()
+        {
+            @Override
+            public MuleEvent answer(InvocationOnMock invocationOnMock) throws Throwable
+            {
+                waitingMessageProcessorExecutionLatch.countDown();
+                waitLatch.await(2000, TimeUnit.MILLISECONDS);
+                return mockFailingMessageProcessor.process((MuleEvent) invocationOnMock.getArguments()[0]);
+            }
+        });
+        when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_LOCK_FACTORY)).thenReturn(new ServerLockFactory());
+        when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
+        InMemoryObjectStore inMemoryObjectStore = new InMemoryObjectStore();
+        when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(inMemoryObjectStore);
+        when(event.getMessage()).thenReturn(message);
+        irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
+        irp.setUseSecureHash(true);
+        irp.setFlowConstruct(mock(FlowConstruct.class));
+        irp.setMuleContext(mockMuleContext);
+        irp.setListener(mockFailingMessageProcessor);
+        irp.setMessageProcessor(mockDlqMessageProcessor);
+
     }
 
     @Test
     public void messageDigestFailure() throws Exception
     {
-        Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
-        Mockito.when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(new InMemoryObjectStore());
-
-        IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
-        irp.setUseSecureHash(true);
-        irp.setMaxRedeliveryCount(1);
-        irp.setFlowConstruct(mock(FlowConstruct.class));
-        irp.setMuleContext(mockMuleContext);
+        when(message.getPayload()).thenReturn(new Object());
         irp.initialise();
-
-
-        when(message.getPayload()).thenReturn(new Object());
-
-        when(event.getMessage()).thenReturn(message);
         MuleEvent process = irp.process(event);
         Assert.assertNull(process);
     }
@@ -82,22 +108,42 @@
     @Test
     public void testMessageRedeliveryUsingMemory() throws Exception
     {
-        Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
-        Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new InMemoryObjectStore());
+        when(message.getPayload()).thenReturn(STRING_MESSAGE);
+        irp.initialise();
+        processUntilFailure();
+        verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+    }
 
-        IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
-        irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
-        irp.setUseSecureHash(true);
-        irp.setFlowConstruct(mock(FlowConstruct.class));
-        irp.setMuleContext(mockMuleContext);
-        irp.setListener(mockFailingMessageProcessor);
-        irp.setDeadLetterQueue(mockDlqMessageProcessor);
+    @Test
+    public void testMessageRedeliveryUsingSerializationStore() throws Exception
+    {
+        when(message.getPayload()).thenReturn(STRING_MESSAGE);
         irp.initialise();
+        processUntilFailure();
+        verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+    }
 
+    @Test
+    public void testThreadSafeObjectStoreUsage() throws Exception
+    {
         when(message.getPayload()).thenReturn(STRING_MESSAGE);
-        when(event.getMessage()).thenReturn(message);
+        irp.setListener(mockWaitingMessageProcessor);
+        irp.initialise();
 
-        for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)
+        ExecuteIrpThread firstIrpExecutionThread = new ExecuteIrpThread();
+        firstIrpExecutionThread.start();
+        ExecuteIrpThread threadCausingRedeliveryException = new ExecuteIrpThread();
+        threadCausingRedeliveryException.start();
+        waitingMessageProcessorExecutionLatch.await(5000, TimeUnit.MILLISECONDS);
+        waitLatch.release();
+        firstIrpExecutionThread.join();
+        threadCausingRedeliveryException.join();
+        verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+    }
+
+    private void processUntilFailure()
+    {
+        for (int i = 0; i < MAX_REDELIVERY_COUNT + 2; i++)
         {
             try
             {
@@ -107,28 +153,14 @@
             {
             }
         }
-        verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1));
     }
 
-    @Test
-    public void testMessageRedeliveryUsingSerializationStore() throws Exception
+    public class ExecuteIrpThread extends Thread
     {
-        Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
-        Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new SerializationObjectStore());
-
-        IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
-        irp.setUseSecureHash(true);
-        irp.setMaxRedeliveryCount(1);
-        irp.setFlowConstruct(mock(FlowConstruct.class));
-        irp.setMuleContext(mockMuleContext);
-        irp.setListener(mockFailingMessageProcessor);
-        irp.setDeadLetterQueue(mockDlqMessageProcessor);
-        irp.initialise();
-
-        when(message.getPayload()).thenReturn(STRING_MESSAGE);
-        when(event.getMessage()).thenReturn(message);
-
-        for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)
+        public Exception exception;
+        
+        @Override
+        public void run()
         {
             try
             {
@@ -136,15 +168,18 @@
             }
             catch (Exception e)
             {
+                exception = e;
             }
         }
-        verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1));
     }
+   
+    
 
     public static class SerializationObjectStore implements ObjectStore<AtomicInteger>
     {
 
         private Map<Serializable,Serializable> store = new HashMap<Serializable,Serializable>();
+        private ServerLock lockableObjectStore = new ServerLock();
 
         @Override
         public boolean contains(Serializable key) throws ObjectStoreException
@@ -177,11 +212,13 @@
         {
             return false;
         }
+
     }
 
     public static class InMemoryObjectStore implements ObjectStore<AtomicInteger>
     {
         private Map<Serializable,AtomicInteger> store = new HashMap<Serializable,AtomicInteger>();
+        private ServerLock lockableObjectStore = new ServerLock();
 
         @Override
         public boolean contains(Serializable key) throws ObjectStoreException
@@ -212,6 +249,7 @@
         {
             return false;
         }
+
     }
 
 }

Added: branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java (0 => 24786)


--- branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,90 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.tck.junit4.rule;
+
+import org.junit.rules.ExternalResource;
+
+/**
+ * Sets up a system property before a test and guaranties to tear it down
+ * afterward.
+ */
+public class SystemProperty extends ExternalResource
+{
+
+    private final String name;
+    private String value;
+    private boolean initialized;
+    private String oldValue;
+
+    public SystemProperty(String name)
+    {
+        this(name, null);
+    }
+
+    public SystemProperty(String name, String value)
+    {
+        this.name = name;
+        this.value = value;
+    }
+
+    @Override
+    protected void before() throws Throwable
+    {
+        if (initialized)
+        {
+            throw new IllegalArgumentException("System property was already initialized");
+        }
+
+        oldValue = System.setProperty(name, getValue());
+        initialized = true;
+    }
+
+    @Override
+    protected void after()
+    {
+        if (!initialized)
+        {
+            throw new IllegalArgumentException("System property was not initialized");
+        }
+
+        doCleanUp();
+        restoreOldValue();
+
+        initialized = false;
+    }
+
+    protected void restoreOldValue()
+    {
+        if (oldValue == null)
+        {
+            System.clearProperty(name);
+        }
+        else
+        {
+            System.setProperty(name, oldValue);
+        }
+    }
+
+    public String getName()
+    {
+        return name;
+    }
+
+    protected void doCleanUp()
+    {
+        // Nothing to do
+    };
+
+    public String getValue()
+    {
+        return value;
+    };
+}
Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java (0 => 24786)


--- branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java	2012-08-21 05:18:14 UTC (rev 24786)
@@ -0,0 +1,151 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.lock;
+
+import org.junit.Test;
+import org.mule.api.store.ObjectAlreadyExistsException;
+import org.mule.api.store.ObjectStore;
+import org.mule.api.store.ObjectStoreException;
+import org.mule.config.i18n.CoreMessages;
+import org.mule.tck.junit4.AbstractMuleTestCase;
+import org.mule.util.concurrent.Latch;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class ServerLockTestCase extends AbstractMuleTestCase
+{
+    public static final int THREAD_COUNT = 100;
+    public static final int ITERATIONS_PER_THREAD = 100;
+    private Latch threadStartLatch = new Latch();
+    private String sharedKeyA = "A";
+    private String sharedKeyB = "B";
+    private ServerLock<String> serverLock = new ServerLock<String>();
+    private InMemoryObjectStore objectStore  = new InMemoryObjectStore();
+
+    @Test
+    public void testHighConcurrency() throws Exception
+    {
+        List<Thread> threads = new ArrayList<Thread>(THREAD_COUNT);
+        for (int i = 0; i < THREAD_COUNT; i++)
+        {
+            IncrementKeyValueThread incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyA);
+            threads.add(incrementKeyValueThread);
+            incrementKeyValueThread.start();
+            incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyB);
+            threads.add(incrementKeyValueThread);
+            incrementKeyValueThread.start();
+        }
+        threadStartLatch.release();
+        for (Thread thread : threads)
+        {
+            thread.join();
+        }
+        assertThat(objectStore.retrieve(sharedKeyA), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+        assertThat(objectStore.retrieve(sharedKeyB), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+    }
+    
+    public class IncrementKeyValueThread extends Thread
+    {
+        private String key;
+
+        public IncrementKeyValueThread(String key)
+        {
+            super("Thread-" + key);
+            this.key = key;
+        }
+
+        @Override
+        public void run()
+        {
+            try
+            {
+                threadStartLatch.await(5000, TimeUnit.MILLISECONDS);
+                for (int i = 0; i < ITERATIONS_PER_THREAD; i ++)
+                {
+                    if (Thread.interrupted())
+                    {
+                        break;
+                    }
+                    serverLock.lock(key);
+                    try
+                    {
+                        Integer value;
+                        if (objectStore.contains(key))
+                        {
+                            value = objectStore.retrieve(key);
+                            objectStore.remove(key);
+                        }
+                        else
+                        {
+                            value = 0;
+                        }
+                        objectStore.store(key,value + 1);
+                    }
+                    finally
+                    {
+                        serverLock.unlock(key);
+                    }
+                }
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public static class InMemoryObjectStore implements ObjectStore<Integer>
+    {
+        private Map<Serializable,Integer> store = new HashMap<Serializable,Integer>();
+
+        @Override
+        public boolean contains(Serializable key) throws ObjectStoreException
+        {
+            return store.containsKey(key);
+        }
+
+        @Override
+        public void store(Serializable key, Integer value) throws ObjectStoreException
+        {
+            if (store.containsKey(key))
+            {
+                throw new ObjectAlreadyExistsException(CoreMessages.createStaticMessage(""));
+            }
+            store.put(key,value);
+        }
+
+        @Override
+        public Integer retrieve(Serializable key) throws ObjectStoreException
+        {
+            return store.get(key);
+        }
+
+        @Override
+        public Integer remove(Serializable key) throws ObjectStoreException
+        {
+            return store.remove(key);
+        }
+
+        @Override
+        public boolean isPersistent()
+        {
+            return false;
+        }
+    }
+
+}
Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/util/lock/ServerLockTestCase.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Modified: branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml (24785 => 24786)


--- branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml	2012-08-21 03:52:07 UTC (rev 24785)
+++ branches/mule-3.2.x/modules/spring-config/src/main/resources/default-mule-config.xml	2012-08-21 05:18:14 UTC (rev 24786)
@@ -86,6 +86,8 @@
 
     <bean name="_defaultRetryPolicyTemplate" class="org.mule.retry.policies.NoRetryPolicyTemplate"/>
 
+    <bean name="_muleLockFactory" class="org.mule.util.lock.ServerLockFactory"/>
+
     <!-- Default Transformers are now loaded from META-INF/services/org/mule/config/registry-bootstrap.properties so that
     the transformers will be available even when using the TransientRegistry only -->
 

To unsubscribe from this list please visit:

http://xircles.codehaus.org/manage_email