Re: [mule-scm] [mule][24766] branches/mule-3.2.x/core/src: MULE-6403 - creating a locking mechanism for object store.

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: [mule-scm] [mule][24766] branches/mule-3.2.x/core/src: MULE-6403 - creating a locking mechanism for object store.

Pablo Kraan

Hi Pablo,

Here you have my comment on this revision: big commit -> big review


LockableObjectStore: not sure the name is right, you are not locking the OS but only a given key.

I would use lock(Serializable key) and unlock(Serializable key) instead of lockEntry and releaseEntry.  (or acquire/release)


LockEntry's javadoc is wrong on this part "read/update this key". Should say read/remove this key as there is no update operation in a OS.

What happens when there is no object in the store with that key? That must be also defined in the javadoc.


RelaseEntry: what happens if the key is not locked or is locked on a different thread? that info should be on the javadoc.


DefaultLockableObjectStore looks more like a LockableObjectStoreAdapter, ie, a way to convert a standard OS into a lockable one.

DefaultLockableObjectStore must lock/release each key before operating on it, otherwise the lock mechanism is useless.

The usage of the "this" keyword is redundant in many places.


MuleEntryLocker: name must be more generic because this interface can be used to lock anything, not just "entries.

Javadoc must be improved to define the contract more explicitly.

After seeing this interface Is not clear to me why you need to define a LockableObjectStore if you can just implement MuleEntryLocker in DefaultLockableObjectStore


MuleServerEntryLocker:

acquireLock must be final to avoid possible bugs.

Remove the redundant usage of "this"

The lock/release schema seems weak to me: any call to release will work without checking if the caller is the thread that holds the lock. I would expect this methods work similar to how the lock/unlock methods work on ReentrantLock.


I think there are a couple of race conditions on MuleServerEntryLockerTestCase when the key is not in the store:

1) The worker thread does lockableObjectStore.lock(key); and then it does a lockableObjectStore.release(key); on the finally block. In this case there will be an exception when the thread did not locked the key (what not in the store) but another thread locked it.

2) The worker thread does objectStore.contains(key)  and then objectStore.store(key,value + 1);. In this case there will be an exception when the object was already added in a different thread


Seems to me that the API must be extended in some way, maybe the lock method should create a lock even when the key is not there in order to avoid the race condition.


Pablo


On Fri, Aug 17, 2012 at 4:04 PM, <[hidden email]> wrote:
Revision
24766
Author
pablo.lagreca
Date
2012-08-17 14:04:00 -0500 (Fri, 17 Aug 2012)

Log Message

MULE-6403 - creating a locking mechanism for object store. Adding locking mechanism to IdempotentRedeliveryPolicy in order to avoid concurrency issues

Modified Paths

Added Paths

Diff

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java (24765 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java	2012-08-17 18:26:46 UTC (rev 24765)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -130,6 +130,7 @@
     public static final String OBJECT_QUEUE_MANAGER = "_muleQueueManager";
     public static final String OBJECT_STORE_DEFAULT_IN_MEMORY_NAME = "_defaultInMemoryObjectStore";
     public static final String OBJECT_STORE_DEFAULT_PERSISTENT_NAME = "_defaultPersistentObjectStore";
+    public static final String OBJECT_STORE_DEFAULT_LOCKER = "_defaultObjectStoreLocker";
     public static final String QUEUE_STORE_DEFAULT_IN_MEMORY_NAME = "_defaultInMemoryQueueStore";
     public static final String QUEUE_STORE_DEFAULT_PERSISTENT_NAME = "_defaultPersistentQueueStore";
     public static final String DEFAULT_USER_OBJECT_STORE_NAME = "_defaultUserObjectStore";

Added: branches/mule-3.2.x/core/src/main/java/org/mule/api/store/LockableObjectStore.java (0 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/api/store/LockableObjectStore.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/api/store/LockableObjectStore.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -0,0 +1,26 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.api.store;
+
+import java.io.Serializable;
+
+public interface LockableObjectStore<T extends Serializable> extends ObjectStore<T>
+{
+    /**
+     * Locks a key in the store so no other thread can read/update this key until is released.
+     */
+    void lockEntry(Serializable key);
+
+    /**
+     * Unlock a key in the store so other threads can access the entry associated with the key.
+     */
+    void releaseEntry(Serializable key);
+
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/api/store/LockableObjectStore.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/api/store/ObjectStoreManager.java (24765 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/api/store/ObjectStoreManager.java	2012-08-17 18:26:46 UTC (rev 24765)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/api/store/ObjectStoreManager.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -37,4 +37,10 @@
      * Delete all objects from the partition
      */
     void disposeStore(ObjectStore<? extends Serializable> store) throws ObjectStoreException;
+
+    /**
+     * Return a LockableObjectStore using the mule default locker and the provided object store.
+     */
+    <T extends LockableObjectStore<? extends Serializable>> T getLockableObjectStore(ObjectStore<? extends Serializable> objectStore);
+
 }

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java (24765 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java	2012-08-17 18:26:46 UTC (rev 24765)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -49,7 +49,7 @@
     @Override
     public void initialise() throws InitialisationException
     {
-        if (maxRedeliveryCount < 1)
+        if (maxRedeliveryCount < 0)
         {
             throw new InitialisationException(
                 CoreMessages.initialisationFailure(

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java (24765 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java	2012-08-17 18:26:46 UTC (rev 24765)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -16,7 +16,7 @@
 import org.mule.api.lifecycle.InitialisationException;
 import org.mule.api.lifecycle.Startable;
 import org.mule.api.processor.MessageProcessor;
-import org.mule.api.store.ObjectStore;
+import org.mule.api.store.LockableObjectStore;
 import org.mule.api.store.ObjectStoreException;
 import org.mule.api.store.ObjectStoreManager;
 import org.mule.api.transformer.TransformerException;
@@ -41,6 +41,11 @@
  */
 public class IdempotentRedeliveryPolicy extends AbstractRedeliveryPolicy
 {
+    private static final boolean OBJECT_STORE_NO_PERSISTENCE = false;
+    private static final int OBJECT_STORE_NO_ENTRY_LIMIT = -1;
+    private static final int OBJECT_STORE_FIVE_MINUTES_TTL = 60 * 5 * 1000;
+    private static final int OBJECT_STORE_EXPIRATION_INTERVAL = 6000;
+
     private final ObjectToByteArray objectToByteArray = new ObjectToByteArray();
     private final ByteArrayToHexString byteArrayToHexString = new ByteArrayToHexString();
 
@@ -49,7 +54,7 @@
     private boolean useSecureHash;
     private String messageDigestAlgorithm;
     private String idExpression;
-    private ObjectStore<AtomicInteger> store;
+    private LockableObjectStore<AtomicInteger> store;
 
     @Override
     public void initialise() throws InitialisationException
@@ -98,11 +103,12 @@
         store = createStore();
     }
 
-    private ObjectStore<AtomicInteger> createStore() throws InitialisationException
+    private LockableObjectStore<AtomicInteger> createStore() throws InitialisationException
     {
         ObjectStoreManager objectStoreManager = (ObjectStoreManager) muleContext.getRegistry().get(
                                 MuleProperties.OBJECT_STORE_MANAGER);
-        return objectStoreManager.getObjectStore(flowConstruct.getName() + "." + getClass().getName(), false, -1,  60 * 5 * 1000, 6000 );
+        return objectStoreManager.getLockableObjectStore(objectStoreManager.getObjectStore(flowConstruct.getName() + "." + getClass().getName(),
+                OBJECT_STORE_NO_PERSISTENCE, OBJECT_STORE_NO_ENTRY_LIMIT, OBJECT_STORE_FIVE_MINUTES_TTL, OBJECT_STORE_EXPIRATION_INTERVAL));
     }
 
 
@@ -165,45 +171,55 @@
             exceptionSeen = true;
         }
 
-        if (!exceptionSeen)
+        try
         {
-            counter = findCounter(messageId);
-            tooMany = counter != null && counter.get() > maxRedeliveryCount;
-        }
+            store.lockEntry(messageId);
 
-        if (tooMany || exceptionSeen)
-        {
-            try
+            if (!exceptionSeen)
             {
-                return deadLetterQueue.process(event);
+                counter = findCounter(messageId);
+                tooMany = counter != null && counter.get() > maxRedeliveryCount;
             }
-            catch (Exception ex)
-            {
-                logger.info("Exception thrown from failed message processing for message " + messageId, ex);
-            }
-            return null;
-        }
 
-        try
-        {
-            MuleEvent returnEvent = processNext(event);
-            counter = findCounter(messageId);
-            if (counter != null)
-            {
-                resetCounter(messageId);
-            }
-            return returnEvent;
+            if (tooMany || exceptionSeen)
+                {
+                    try
+                    {
+                        return deadLetterQueue.process(event);
+                    }
+                    catch (Exception ex)
+                    {
+                        logger.info("Exception thrown from failed message processing for message " + messageId, ex);
+                    }
+                    return null;
+                }
+        
+                try
+                {
+                    MuleEvent returnEvent = processNext(event);
+                    counter = findCounter(messageId);
+                    if (counter != null)
+                    {
+                        resetCounter(messageId);
+                    }
+                    return returnEvent;
+                }
+                catch (MuleException ex)
+                {
+                    incrementCounter(messageId);
+                    throw ex;
+                }
+                catch (RuntimeException ex)
+                {
+                    incrementCounter(messageId);
+                    throw ex;
+                }
         }
-        catch (MuleException ex)
+        finally 
         {
-            incrementCounter(messageId);
-            throw ex;
+            store.releaseEntry(messageId);
         }
-        catch (RuntimeException ex)
-        {
-            incrementCounter(messageId);
-            throw ex;
-        }
+        
     }
 
     private void resetCounter(String messageId) throws ObjectStoreException

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/store/DefaultLockableObjectStore.java (0 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/store/DefaultLockableObjectStore.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/store/DefaultLockableObjectStore.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -0,0 +1,70 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.store;
+
+import org.mule.api.store.LockableObjectStore;
+import org.mule.api.store.ObjectStore;
+import org.mule.api.store.ObjectStoreException;
+
+import java.io.Serializable;
+
+public class DefaultLockableObjectStore<T extends Serializable> implements LockableObjectStore<T>
+{
+    private ObjectStore<T> objectStore;
+    private MuleEntryLocker entryLocker;
+
+    public DefaultLockableObjectStore(ObjectStore<T> objectStore, MuleEntryLocker muleEntryLocker)
+    {
+        this.objectStore = objectStore;
+        this.entryLocker = muleEntryLocker;
+    }
+
+    @Override
+    public void lockEntry(Serializable key)
+    {
+        this.entryLocker.lock(key);
+    }
+
+    @Override
+    public void releaseEntry(Serializable key)
+    {
+        this.entryLocker.release(key);
+    }
+
+    @Override
+    public boolean contains(Serializable key) throws ObjectStoreException
+    {
+        return objectStore.contains(key);
+    }
+
+    @Override
+    public void store(Serializable key, T value) throws ObjectStoreException
+    {
+        objectStore.store(key,value);
+    }
+
+    @Override
+    public T retrieve(Serializable key) throws ObjectStoreException
+    {
+        return objectStore.retrieve(key);
+    }
+
+    @Override
+    public T remove(Serializable key) throws ObjectStoreException
+    {
+        return objectStore.remove(key);
+    }
+
+    @Override
+    public boolean isPersistent()
+    {
+        return objectStore.isPersistent();
+    }
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/store/DefaultLockableObjectStore.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleEntryLocker.java (0 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleEntryLocker.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleEntryLocker.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -0,0 +1,29 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.store;
+
+/**
+ *
+ * Provides a lock mechanism for share information in Mule.
+ *
+ */
+public interface MuleEntryLocker<T>
+{
+    /**
+     * Creates a lock around a lockIdentifier.
+     * To release lock use release method with the same identifier
+     */
+    public void lock(T lockIdentifier);
+
+    /**
+     *  Releases a lock previously locked.
+     */
+    public void release(T lockIdentifier);
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleEntryLocker.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleObjectStoreManager.java (24765 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleObjectStoreManager.java	2012-08-17 18:26:46 UTC (rev 24765)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleObjectStoreManager.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -10,6 +10,8 @@
 
 package org.mule.util.store;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.mule.api.MuleContext;
 import org.mule.api.MuleRuntimeException;
 import org.mule.api.config.MuleProperties;
@@ -18,6 +20,7 @@
 import org.mule.api.lifecycle.Initialisable;
 import org.mule.api.lifecycle.InitialisationException;
 import org.mule.api.store.ListableObjectStore;
+import org.mule.api.store.LockableObjectStore;
 import org.mule.api.store.ObjectStore;
 import org.mule.api.store.ObjectStoreException;
 import org.mule.api.store.ObjectStoreManager;
@@ -32,9 +35,6 @@
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 public class MuleObjectStoreManager
     implements ObjectStoreManager, MuleContextAware, Initialisable, Disposable
 {
@@ -259,4 +259,20 @@
             }
         }
     }
+
+    @Override
+    public <T extends LockableObjectStore<? extends Serializable>> T getLockableObjectStore(ObjectStore<? extends Serializable> objectStore)
+    {
+        LockableObjectStore lockableObjectStore;
+        if (objectStore instanceof LockableObjectStore)
+        {
+            lockableObjectStore = (LockableObjectStore) objectStore;
+        }
+        else
+        {
+            MuleEntryLocker muleEntryLocker = muleContext.getRegistry().get(MuleProperties.OBJECT_STORE_DEFAULT_LOCKER);
+            lockableObjectStore = new DefaultLockableObjectStore(objectStore, muleEntryLocker);
+        }
+        return (T) lockableObjectStore;
+    }
 }

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleServerEntryLocker.java (0 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleServerEntryLocker.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleServerEntryLocker.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -0,0 +1,60 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.store;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class MuleServerEntryLocker implements MuleEntryLocker<Serializable>
+{
+    private Map<Serializable, ReentrantLock> locks;
+    private Object acquireLock = new Object();
+
+    public MuleServerEntryLocker()
+    {
+        this.locks = new HashMap<Serializable,ReentrantLock>();
+    }
+
+    public void lock(Serializable key)
+    {
+        ReentrantLock lock;
+        synchronized (acquireLock)
+        {
+            if (this.locks.containsKey(key))
+            {
+                lock = this.locks.get(key);
+            }
+            else
+            {
+                lock = new ReentrantLock(true);
+                this.locks.put(key,lock);
+            }
+        }
+        lock.lock();
+    }
+
+    public void release(Serializable key)
+    {
+        synchronized (acquireLock)
+        {
+            ReentrantLock reentrantLock = this.locks.get(key);
+            if (reentrantLock != null)
+            {
+                if (!reentrantLock.hasQueuedThreads())
+                {
+                    this.locks.remove(key);
+                }
+                reentrantLock.unlock();
+            }
+        }
+    }
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleServerEntryLocker.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Modified: branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java (24765 => 24766)


--- branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java	2012-08-17 18:26:46 UTC (rev 24765)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -10,71 +10,97 @@
 
 package org.mule.processor;
 
-import static org.mockito.Matchers.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-
+import junit.framework.Assert;
 import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
 import org.mockito.Answers;
-import org.mockito.Mockito;
 import org.mockito.internal.verification.VerificationModeFactory;
-import org.mule.api.*;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mule.api.MuleContext;
+import org.mule.api.MuleEvent;
+import org.mule.api.MuleException;
+import org.mule.api.MuleMessage;
 import org.mule.api.config.MuleProperties;
 import org.mule.api.construct.FlowConstruct;
 import org.mule.api.processor.MessageProcessor;
+import org.mule.api.store.LockableObjectStore;
 import org.mule.api.store.ObjectStore;
 import org.mule.api.store.ObjectStoreException;
 import org.mule.api.store.ObjectStoreManager;
-import org.mule.routing.MessageProcessorFilterPair;
 import org.mule.tck.junit4.AbstractMuleTestCase;
-
-import org.junit.Test;
-
-import junit.framework.Assert;
+import org.mule.tck.junit4.rule.SystemProperty;
 import org.mule.util.SerializationUtils;
+import org.mule.util.concurrent.Latch;
+import org.mule.util.store.MuleServerEntryLocker;
 
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class IdempotentRedeliveryPolicyTestCase extends AbstractMuleTestCase
 {
 
     public static final String STRING_MESSAGE = "message";
-    public static final int MAX_REDELIVERY_COUNT = 1;
+    public static final int MAX_REDELIVERY_COUNT = 0;
     private MuleContext mockMuleContext = mock(MuleContext.class, Answers.RETURNS_DEEP_STUBS.get());
     private ObjectStoreManager mockObjectStoreManager = mock(ObjectStoreManager.class, Answers.RETURNS_DEEP_STUBS.get());
     private MessageProcessor mockFailingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
-    private MessageProcessorFilterPair mockDlqMessageProcessor = mock(MessageProcessorFilterPair.class, Answers.RETURNS_DEEP_STUBS.get());
+    private MessageProcessor mockWaitingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
+    private MessageProcessor mockDlqMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
     private MuleMessage message = mock(MuleMessage.class, Answers.RETURNS_DEEP_STUBS.get());
     private MuleEvent event = mock(MuleEvent.class, Answers.RETURNS_DEEP_STUBS.get());
+    private Latch waitLatch = new Latch();
+    private CountDownLatch waitingMessageProcessorExecutionLatch = new CountDownLatch(2);
+    private final IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
 
+    @Rule
+    public SystemProperty systemProperty = new SystemProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+
     @Before
     public void setUpTest() throws MuleException
     {
         when(mockFailingMessageProcessor.process(any(MuleEvent.class))).thenThrow(new RuntimeException("failing"));
-        System.setProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+        when(mockWaitingMessageProcessor.process(event)).thenAnswer(new Answer<MuleEvent>()
+        {
+            @Override
+            public MuleEvent answer(InvocationOnMock invocationOnMock) throws Throwable
+            {
+                waitingMessageProcessorExecutionLatch.countDown();
+                waitLatch.await(2000, TimeUnit.MILLISECONDS);
+                return mockFailingMessageProcessor.process((MuleEvent) invocationOnMock.getArguments()[0]);
+            }
+        });
+        when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
+        InMemoryObjectStore inMemoryObjectStore = new InMemoryObjectStore();
+        when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(inMemoryObjectStore);
+        when(mockObjectStoreManager.getLockableObjectStore(inMemoryObjectStore)).thenReturn(inMemoryObjectStore);
+        when(event.getMessage()).thenReturn(message);
+        irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
+        irp.setUseSecureHash(true);
+        irp.setFlowConstruct(mock(FlowConstruct.class));
+        irp.setMuleContext(mockMuleContext);
+        irp.setListener(mockFailingMessageProcessor);
+        irp.setMessageProcessor(mockDlqMessageProcessor);
+
     }
 
     @Test
     public void messageDigestFailure() throws Exception
     {
-        Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
-        Mockito.when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(new InMemoryObjectStore());
-
-        IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
-        irp.setUseSecureHash(true);
-        irp.setMaxRedeliveryCount(1);
-        irp.setFlowConstruct(mock(FlowConstruct.class));
-        irp.setMuleContext(mockMuleContext);
+        when(message.getPayload()).thenReturn(new Object());
         irp.initialise();
-
-
-        when(message.getPayload()).thenReturn(new Object());
-
-        when(event.getMessage()).thenReturn(message);
         MuleEvent process = irp.process(event);
         Assert.assertNull(process);
     }
@@ -82,22 +108,43 @@
     @Test
     public void testMessageRedeliveryUsingMemory() throws Exception
     {
-        Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
-        Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new InMemoryObjectStore());
+        when(message.getPayload()).thenReturn(STRING_MESSAGE);
+        irp.initialise();
+        processUntilFailure();
+        verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+    }
 
-        IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
-        irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
-        irp.setUseSecureHash(true);
-        irp.setFlowConstruct(mock(FlowConstruct.class));
-        irp.setMuleContext(mockMuleContext);
-        irp.setListener(mockFailingMessageProcessor);
-        irp.setDeadLetterQueue(mockDlqMessageProcessor);
+    @Test
+    public void testMessageRedeliveryUsingSerializationStore() throws Exception
+    {
+        when(message.getPayload()).thenReturn(STRING_MESSAGE);
+        when(mockObjectStoreManager.getLockableObjectStore(any(ObjectStore.class))).thenReturn(new SerializationObjectStore());
         irp.initialise();
+        processUntilFailure();
+        verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+    }
 
+    @Test
+    public void testThreadSafeObjectStoreUsage() throws Exception
+    {
         when(message.getPayload()).thenReturn(STRING_MESSAGE);
-        when(event.getMessage()).thenReturn(message);
+        irp.setListener(mockWaitingMessageProcessor);
+        irp.initialise();
 
-        for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)
+        ExecuteIrpThread firstIrpExecutionThread = new ExecuteIrpThread();
+        firstIrpExecutionThread.start();
+        ExecuteIrpThread threadCausingRedeliveryException = new ExecuteIrpThread();
+        threadCausingRedeliveryException.start();
+        waitingMessageProcessorExecutionLatch.await(5000, TimeUnit.MILLISECONDS);
+        waitLatch.release();
+        firstIrpExecutionThread.join();
+        threadCausingRedeliveryException.join();
+        verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+    }
+
+    private void processUntilFailure()
+    {
+        for (int i = 0; i < MAX_REDELIVERY_COUNT + 2; i++)
         {
             try
             {
@@ -107,28 +154,14 @@
             {
             }
         }
-        verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1));
     }
 
-    @Test
-    public void testMessageRedeliveryUsingSerializationStore() throws Exception
+    public class ExecuteIrpThread extends Thread
     {
-        Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
-        Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new SerializationObjectStore());
-
-        IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
-        irp.setUseSecureHash(true);
-        irp.setMaxRedeliveryCount(1);
-        irp.setFlowConstruct(mock(FlowConstruct.class));
-        irp.setMuleContext(mockMuleContext);
-        irp.setListener(mockFailingMessageProcessor);
-        irp.setDeadLetterQueue(mockDlqMessageProcessor);
-        irp.initialise();
-
-        when(message.getPayload()).thenReturn(STRING_MESSAGE);
-        when(event.getMessage()).thenReturn(message);
-
-        for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)
+        public Exception exception;
+        
+        @Override
+        public void run()
         {
             try
             {
@@ -136,15 +169,18 @@
             }
             catch (Exception e)
             {
+                exception = e;
             }
         }
-        verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1));
     }
+   
+    
 
-    public static class SerializationObjectStore implements ObjectStore<AtomicInteger>
+    public static class SerializationObjectStore implements LockableObjectStore<AtomicInteger>
     {
 
         private Map<Serializable,Serializable> store = new HashMap<Serializable,Serializable>();
+        private MuleServerEntryLocker lockableObjectStore = new MuleServerEntryLocker();
 
         @Override
         public boolean contains(Serializable key) throws ObjectStoreException
@@ -177,11 +213,24 @@
         {
             return false;
         }
+
+        @Override
+        public void lockEntry(Serializable key)
+        {
+            lockableObjectStore.lock(key);
+        }
+
+        @Override
+        public void releaseEntry(Serializable key)
+        {
+            lockableObjectStore.release(key);
+        }
     }
 
-    public static class InMemoryObjectStore implements ObjectStore<AtomicInteger>
+    public static class InMemoryObjectStore implements LockableObjectStore<AtomicInteger>
     {
         private Map<Serializable,AtomicInteger> store = new HashMap<Serializable,AtomicInteger>();
+        private MuleServerEntryLocker lockableObjectStore = new MuleServerEntryLocker();
 
         @Override
         public boolean contains(Serializable key) throws ObjectStoreException
@@ -212,6 +261,18 @@
         {
             return false;
         }
+
+        @Override
+        public void lockEntry(Serializable key)
+        {
+            lockableObjectStore.lock(key);
+        }
+
+        @Override
+        public void releaseEntry(Serializable key)
+        {
+            lockableObjectStore.release(key);
+        }
     }
 
 }

Added: branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java (0 => 24766)


--- branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -0,0 +1,90 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.tck.junit4.rule;
+
+import org.junit.rules.ExternalResource;
+
+/**
+ * Sets up a system property before a test and guaranties to tear it down
+ * afterward.
+ */
+public class SystemProperty extends ExternalResource
+{
+
+    private final String name;
+    private String value;
+    private boolean initialized;
+    private String oldValue;
+
+    public SystemProperty(String name)
+    {
+        this(name, null);
+    }
+
+    public SystemProperty(String name, String value)
+    {
+        this.name = name;
+        this.value = value;
+    }
+
+    @Override
+    protected void before() throws Throwable
+    {
+        if (initialized)
+        {
+            throw new IllegalArgumentException("System property was already initialized");
+        }
+
+        oldValue = System.setProperty(name, getValue());
+        initialized = true;
+    }
+
+    @Override
+    protected void after()
+    {
+        if (!initialized)
+        {
+            throw new IllegalArgumentException("System property was not initialized");
+        }
+
+        doCleanUp();
+        restoreOldValue();
+
+        initialized = false;
+    }
+
+    protected void restoreOldValue()
+    {
+        if (oldValue == null)
+        {
+            System.clearProperty(name);
+        }
+        else
+        {
+            System.setProperty(name, oldValue);
+        }
+    }
+
+    public String getName()
+    {
+        return name;
+    }
+
+    protected void doCleanUp()
+    {
+        // Nothing to do
+    };
+
+    public String getValue()
+    {
+        return value;
+    };
+}
Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.2.x/core/src/test/java/org/mule/util/store/MuleServerEntryLockerTestCase.java (0 => 24766)


--- branches/mule-3.2.x/core/src/test/java/org/mule/util/store/MuleServerEntryLockerTestCase.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/util/store/MuleServerEntryLockerTestCase.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -0,0 +1,148 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.store;
+
+import org.junit.Test;
+import org.mule.api.store.ObjectAlreadyExistsException;
+import org.mule.api.store.ObjectStore;
+import org.mule.api.store.ObjectStoreException;
+import org.mule.config.i18n.CoreMessages;
+import org.mule.util.concurrent.Latch;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class MuleServerEntryLockerTestCase
+{
+    public static final int THREAD_COUNT = 100;
+    public static final int ITERATIONS_PER_THREAD = 1000;
+    private String sharedKeyA = "A";
+    private String sharedKeyB = "B";
+    private MuleServerEntryLocker lockableObjectStore = new MuleServerEntryLocker();
+    private InMemoryObjectStore objectStore  = new InMemoryObjectStore();
+    private Latch threadStartLatch = new Latch();
+
+
+    
+    @Test
+    public void testHighConcurrency() throws Exception
+    {
+        List<Thread> threads = new ArrayList<Thread>(THREAD_COUNT);
+        for (int i = 0; i < THREAD_COUNT; i++)
+        {
+            IncrementKeyValueThread incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyA);
+            threads.add(incrementKeyValueThread);
+            incrementKeyValueThread.start();
+            incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyB);
+            threads.add(incrementKeyValueThread);
+            incrementKeyValueThread.start();
+        }
+        threadStartLatch.release();
+        for (Thread thread : threads)
+        {
+            thread.join();
+        }
+        assertThat(objectStore.retrieve(sharedKeyA), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+        assertThat(objectStore.retrieve(sharedKeyB), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+    }
+    
+    public class IncrementKeyValueThread extends Thread
+    {
+        private String key;
+
+        public IncrementKeyValueThread(String key)
+        {
+            super("Thread-" + key);
+            this.key = key;
+        }
+
+        @Override
+        public void run()
+        {
+            try
+            {
+                threadStartLatch.await(5000, TimeUnit.MILLISECONDS);
+                for (int i = 0; i < ITERATIONS_PER_THREAD; i ++)
+                {
+                    try
+                    {
+                        lockableObjectStore.lock(key);
+                        Integer value;
+                        if (objectStore.contains(key))
+                        {
+                            value = objectStore.retrieve(key);
+                            objectStore.remove(key);
+                        }
+                        else
+                        {
+                            value = 0;
+                        }
+                        objectStore.store(key,value + 1);
+                    }
+                    finally
+                    {
+                        lockableObjectStore.release(key);
+                    }
+                }
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public static class InMemoryObjectStore implements ObjectStore<Integer>
+    {
+        private Map<Serializable,Integer> store = new HashMap<Serializable,Integer>();
+
+        @Override
+        public boolean contains(Serializable key) throws ObjectStoreException
+        {
+            return store.containsKey(key);
+        }
+
+        @Override
+        public void store(Serializable key, Integer value) throws ObjectStoreException
+        {
+            if (store.containsKey(key))
+            {
+                throw new ObjectAlreadyExistsException(CoreMessages.createStaticMessage(""));
+            }
+            store.put(key,value);
+        }
+
+        @Override
+        public Integer retrieve(Serializable key) throws ObjectStoreException
+        {
+            return store.get(key);
+        }
+
+        @Override
+        public Integer remove(Serializable key) throws ObjectStoreException
+        {
+            return store.remove(key);
+        }
+
+        @Override
+        public boolean isPersistent()
+        {
+            return false;
+        }
+    }
+
+}
Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/util/store/MuleServerEntryLockerTestCase.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style


To unsubscribe from this list please visit:

http://xircles.codehaus.org/manage_email


Reply | Threaded
Open this post in threaded view
|

Re: [mule-scm] [mule][24766] branches/mule-3.2.x/core/src: MULE-6403 - creating a locking mechanism for object store.

David Dossot
This post has NOT been accepted by the mailing list yet.
Any CDEV doc that explains the rationale for LockableObjectStore?


On Sat, Aug 18, 2012 at 3:59 PM, Pablo Kraan [via Mule] <[hidden email]> wrote:

Hi Pablo,

Here you have my comment on this revision: big commit -> big review


LockableObjectStore: not sure the name is right, you are not locking the OS but only a given key.

I would use lock(Serializable key) and unlock(Serializable key) instead of lockEntry and releaseEntry.  (or acquire/release)


LockEntry's javadoc is wrong on this part "read/update this key". Should say read/remove this key as there is no update operation in a OS.

What happens when there is no object in the store with that key? That must be also defined in the javadoc.


RelaseEntry: what happens if the key is not locked or is locked on a different thread? that info should be on the javadoc.


DefaultLockableObjectStore looks more like a LockableObjectStoreAdapter, ie, a way to convert a standard OS into a lockable one.

DefaultLockableObjectStore must lock/release each key before operating on it, otherwise the lock mechanism is useless.

The usage of the "this" keyword is redundant in many places.


MuleEntryLocker: name must be more generic because this interface can be used to lock anything, not just "entries.

Javadoc must be improved to define the contract more explicitly.

After seeing this interface Is not clear to me why you need to define a LockableObjectStore if you can just implement MuleEntryLocker in DefaultLockableObjectStore


MuleServerEntryLocker:

acquireLock must be final to avoid possible bugs.

Remove the redundant usage of "this"

The lock/release schema seems weak to me: any call to release will work without checking if the caller is the thread that holds the lock. I would expect this methods work similar to how the lock/unlock methods work on ReentrantLock.


I think there are a couple of race conditions on MuleServerEntryLockerTestCase when the key is not in the store:

1) The worker thread does lockableObjectStore.lock(key); and then it does a lockableObjectStore.release(key); on the finally block. In this case there will be an exception when the thread did not locked the key (what not in the store) but another thread locked it.

2) The worker thread does objectStore.contains(key)  and then objectStore.store(key,value + 1);. In this case there will be an exception when the object was already added in a different thread


Seems to me that the API must be extended in some way, maybe the lock method should create a lock even when the key is not there in order to avoid the race condition.


Pablo


On Fri, Aug 17, 2012 at 4:04 PM, <[hidden email]> wrote:
Revision
24766
Author
pablo.lagreca
Date
2012-08-17 14:04:00 -0500 (Fri, 17 Aug 2012)

Log Message

MULE-6403 - creating a locking mechanism for object store. Adding locking mechanism to IdempotentRedeliveryPolicy in order to avoid concurrency issues

Modified Paths

Added Paths

Diff

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java (24765 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java	2012-08-17 18:26:46 UTC (rev 24765)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -130,6 +130,7 @@
     public static final String OBJECT_QUEUE_MANAGER = "_muleQueueManager";
     public static final String OBJECT_STORE_DEFAULT_IN_MEMORY_NAME = "_defaultInMemoryObjectStore";
     public static final String OBJECT_STORE_DEFAULT_PERSISTENT_NAME = "_defaultPersistentObjectStore";
+    public static final String OBJECT_STORE_DEFAULT_LOCKER = "_defaultObjectStoreLocker";
     public static final String QUEUE_STORE_DEFAULT_IN_MEMORY_NAME = "_defaultInMemoryQueueStore";
     public static final String QUEUE_STORE_DEFAULT_PERSISTENT_NAME = "_defaultPersistentQueueStore";
     public static final String DEFAULT_USER_OBJECT_STORE_NAME = "_defaultUserObjectStore";

Added: branches/mule-3.2.x/core/src/main/java/org/mule/api/store/LockableObjectStore.java (0 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/api/store/LockableObjectStore.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/api/store/LockableObjectStore.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -0,0 +1,26 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.api.store;
+
+import java.io.Serializable;
+
+public interface LockableObjectStore<T extends Serializable> extends ObjectStore<T>
+{
+    /**
+     * Locks a key in the store so no other thread can read/update this key until is released.
+     */
+    void lockEntry(Serializable key);
+
+    /**
+     * Unlock a key in the store so other threads can access the entry associated with the key.
+     */
+    void releaseEntry(Serializable key);
+
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/api/store/LockableObjectStore.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/api/store/ObjectStoreManager.java (24765 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/api/store/ObjectStoreManager.java	2012-08-17 18:26:46 UTC (rev 24765)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/api/store/ObjectStoreManager.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -37,4 +37,10 @@
      * Delete all objects from the partition
      */
     void disposeStore(ObjectStore<? extends Serializable> store) throws ObjectStoreException;
+
+    /**
+     * Return a LockableObjectStore using the mule default locker and the provided object store.
+     */
+    <T extends LockableObjectStore<? extends Serializable>> T getLockableObjectStore(ObjectStore<? extends Serializable> objectStore);
+
 }

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java (24765 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java	2012-08-17 18:26:46 UTC (rev 24765)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -49,7 +49,7 @@
     @Override
     public void initialise() throws InitialisationException
     {
-        if (maxRedeliveryCount < 1)
+        if (maxRedeliveryCount < 0)
         {
             throw new InitialisationException(
                 CoreMessages.initialisationFailure(

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java (24765 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java	2012-08-17 18:26:46 UTC (rev 24765)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -16,7 +16,7 @@
 import org.mule.api.lifecycle.InitialisationException;
 import org.mule.api.lifecycle.Startable;
 import org.mule.api.processor.MessageProcessor;
-import org.mule.api.store.ObjectStore;
+import org.mule.api.store.LockableObjectStore;
 import org.mule.api.store.ObjectStoreException;
 import org.mule.api.store.ObjectStoreManager;
 import org.mule.api.transformer.TransformerException;
@@ -41,6 +41,11 @@
  */
 public class IdempotentRedeliveryPolicy extends AbstractRedeliveryPolicy
 {
+    private static final boolean OBJECT_STORE_NO_PERSISTENCE = false;
+    private static final int OBJECT_STORE_NO_ENTRY_LIMIT = -1;
+    private static final int OBJECT_STORE_FIVE_MINUTES_TTL = 60 * 5 * 1000;
+    private static final int OBJECT_STORE_EXPIRATION_INTERVAL = 6000;
+
     private final ObjectToByteArray objectToByteArray = new ObjectToByteArray();
     private final ByteArrayToHexString byteArrayToHexString = new ByteArrayToHexString();
 
@@ -49,7 +54,7 @@
     private boolean useSecureHash;
     private String messageDigestAlgorithm;
     private String idExpression;
-    private ObjectStore<AtomicInteger> store;
+    private LockableObjectStore<AtomicInteger> store;
 
     @Override
     public void initialise() throws InitialisationException
@@ -98,11 +103,12 @@
         store = createStore();
     }
 
-    private ObjectStore<AtomicInteger> createStore() throws InitialisationException
+    private LockableObjectStore<AtomicInteger> createStore() throws InitialisationException
     {
         ObjectStoreManager objectStoreManager = (ObjectStoreManager) muleContext.getRegistry().get(
                                 MuleProperties.OBJECT_STORE_MANAGER);
-        return objectStoreManager.getObjectStore(flowConstruct.getName() + "." + getClass().getName(), false, -1,  60 * 5 * 1000, 6000 );
+        return objectStoreManager.getLockableObjectStore(objectStoreManager.getObjectStore(flowConstruct.getName() + "." + getClass().getName(),
+                OBJECT_STORE_NO_PERSISTENCE, OBJECT_STORE_NO_ENTRY_LIMIT, OBJECT_STORE_FIVE_MINUTES_TTL, OBJECT_STORE_EXPIRATION_INTERVAL));
     }
 
 
@@ -165,45 +171,55 @@
             exceptionSeen = true;
         }
 
-        if (!exceptionSeen)
+        try
         {
-            counter = findCounter(messageId);
-            tooMany = counter != null && counter.get() > maxRedeliveryCount;
-        }
+            store.lockEntry(messageId);
 
-        if (tooMany || exceptionSeen)
-        {
-            try
+            if (!exceptionSeen)
             {
-                return deadLetterQueue.process(event);
+                counter = findCounter(messageId);
+                tooMany = counter != null && counter.get() > maxRedeliveryCount;
             }
-            catch (Exception ex)
-            {
-                logger.info("Exception thrown from failed message processing for message " + messageId, ex);
-            }
-            return null;
-        }
 
-        try
-        {
-            MuleEvent returnEvent = processNext(event);
-            counter = findCounter(messageId);
-            if (counter != null)
-            {
-                resetCounter(messageId);
-            }
-            return returnEvent;
+            if (tooMany || exceptionSeen)
+                {
+                    try
+                    {
+                        return deadLetterQueue.process(event);
+                    }
+                    catch (Exception ex)
+                    {
+                        logger.info("Exception thrown from failed message processing for message " + messageId, ex);
+                    }
+                    return null;
+                }
+        
+                try
+                {
+                    MuleEvent returnEvent = processNext(event);
+                    counter = findCounter(messageId);
+                    if (counter != null)
+                    {
+                        resetCounter(messageId);
+                    }
+                    return returnEvent;
+                }
+                catch (MuleException ex)
+                {
+                    incrementCounter(messageId);
+                    throw ex;
+                }
+                catch (RuntimeException ex)
+                {
+                    incrementCounter(messageId);
+                    throw ex;
+                }
         }
-        catch (MuleException ex)
+        finally 
         {
-            incrementCounter(messageId);
-            throw ex;
+            store.releaseEntry(messageId);
         }
-        catch (RuntimeException ex)
-        {
-            incrementCounter(messageId);
-            throw ex;
-        }
+        
     }
 
     private void resetCounter(String messageId) throws ObjectStoreException

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/store/DefaultLockableObjectStore.java (0 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/store/DefaultLockableObjectStore.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/store/DefaultLockableObjectStore.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -0,0 +1,70 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.store;
+
+import org.mule.api.store.LockableObjectStore;
+import org.mule.api.store.ObjectStore;
+import org.mule.api.store.ObjectStoreException;
+
+import java.io.Serializable;
+
+public class DefaultLockableObjectStore<T extends Serializable> implements LockableObjectStore<T>
+{
+    private ObjectStore<T> objectStore;
+    private MuleEntryLocker entryLocker;
+
+    public DefaultLockableObjectStore(ObjectStore<T> objectStore, MuleEntryLocker muleEntryLocker)
+    {
+        this.objectStore = objectStore;
+        this.entryLocker = muleEntryLocker;
+    }
+
+    @Override
+    public void lockEntry(Serializable key)
+    {
+        this.entryLocker.lock(key);
+    }
+
+    @Override
+    public void releaseEntry(Serializable key)
+    {
+        this.entryLocker.release(key);
+    }
+
+    @Override
+    public boolean contains(Serializable key) throws ObjectStoreException
+    {
+        return objectStore.contains(key);
+    }
+
+    @Override
+    public void store(Serializable key, T value) throws ObjectStoreException
+    {
+        objectStore.store(key,value);
+    }
+
+    @Override
+    public T retrieve(Serializable key) throws ObjectStoreException
+    {
+        return objectStore.retrieve(key);
+    }
+
+    @Override
+    public T remove(Serializable key) throws ObjectStoreException
+    {
+        return objectStore.remove(key);
+    }
+
+    @Override
+    public boolean isPersistent()
+    {
+        return objectStore.isPersistent();
+    }
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/store/DefaultLockableObjectStore.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleEntryLocker.java (0 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleEntryLocker.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleEntryLocker.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -0,0 +1,29 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.store;
+
+/**
+ *
+ * Provides a lock mechanism for share information in Mule.
+ *
+ */
+public interface MuleEntryLocker<T>
+{
+    /**
+     * Creates a lock around a lockIdentifier.
+     * To release lock use release method with the same identifier
+     */
+    public void lock(T lockIdentifier);
+
+    /**
+     *  Releases a lock previously locked.
+     */
+    public void release(T lockIdentifier);
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleEntryLocker.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleObjectStoreManager.java (24765 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleObjectStoreManager.java	2012-08-17 18:26:46 UTC (rev 24765)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleObjectStoreManager.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -10,6 +10,8 @@
 
 package org.mule.util.store;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.mule.api.MuleContext;
 import org.mule.api.MuleRuntimeException;
 import org.mule.api.config.MuleProperties;
@@ -18,6 +20,7 @@
 import org.mule.api.lifecycle.Initialisable;
 import org.mule.api.lifecycle.InitialisationException;
 import org.mule.api.store.ListableObjectStore;
+import org.mule.api.store.LockableObjectStore;
 import org.mule.api.store.ObjectStore;
 import org.mule.api.store.ObjectStoreException;
 import org.mule.api.store.ObjectStoreManager;
@@ -32,9 +35,6 @@
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 public class MuleObjectStoreManager
     implements ObjectStoreManager, MuleContextAware, Initialisable, Disposable
 {
@@ -259,4 +259,20 @@
             }
         }
     }
+
+    @Override
+    public <T extends LockableObjectStore<? extends Serializable>> T getLockableObjectStore(ObjectStore<? extends Serializable> objectStore)
+    {
+        LockableObjectStore lockableObjectStore;
+        if (objectStore instanceof LockableObjectStore)
+        {
+            lockableObjectStore = (LockableObjectStore) objectStore;
+        }
+        else
+        {
+            MuleEntryLocker muleEntryLocker = muleContext.getRegistry().get(MuleProperties.OBJECT_STORE_DEFAULT_LOCKER);
+            lockableObjectStore = new DefaultLockableObjectStore(objectStore, muleEntryLocker);
+        }
+        return (T) lockableObjectStore;
+    }
 }

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleServerEntryLocker.java (0 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleServerEntryLocker.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleServerEntryLocker.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -0,0 +1,60 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.store;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class MuleServerEntryLocker implements MuleEntryLocker<Serializable>
+{
+    private Map<Serializable, ReentrantLock> locks;
+    private Object acquireLock = new Object();
+
+    public MuleServerEntryLocker()
+    {
+        this.locks = new HashMap<Serializable,ReentrantLock>();
+    }
+
+    public void lock(Serializable key)
+    {
+        ReentrantLock lock;
+        synchronized (acquireLock)
+        {
+            if (this.locks.containsKey(key))
+            {
+                lock = this.locks.get(key);
+            }
+            else
+            {
+                lock = new ReentrantLock(true);
+                this.locks.put(key,lock);
+            }
+        }
+        lock.lock();
+    }
+
+    public void release(Serializable key)
+    {
+        synchronized (acquireLock)
+        {
+            ReentrantLock reentrantLock = this.locks.get(key);
+            if (reentrantLock != null)
+            {
+                if (!reentrantLock.hasQueuedThreads())
+                {
+                    this.locks.remove(key);
+                }
+                reentrantLock.unlock();
+            }
+        }
+    }
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleServerEntryLocker.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Modified: branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java (24765 => 24766)


--- branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java	2012-08-17 18:26:46 UTC (rev 24765)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -10,71 +10,97 @@
 
 package org.mule.processor;
 
-import static org.mockito.Matchers.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-
+import junit.framework.Assert;
 import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
 import org.mockito.Answers;
-import org.mockito.Mockito;
 import org.mockito.internal.verification.VerificationModeFactory;
-import org.mule.api.*;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mule.api.MuleContext;
+import org.mule.api.MuleEvent;
+import org.mule.api.MuleException;
+import org.mule.api.MuleMessage;
 import org.mule.api.config.MuleProperties;
 import org.mule.api.construct.FlowConstruct;
 import org.mule.api.processor.MessageProcessor;
+import org.mule.api.store.LockableObjectStore;
 import org.mule.api.store.ObjectStore;
 import org.mule.api.store.ObjectStoreException;
 import org.mule.api.store.ObjectStoreManager;
-import org.mule.routing.MessageProcessorFilterPair;
 import org.mule.tck.junit4.AbstractMuleTestCase;
-
-import org.junit.Test;
-
-import junit.framework.Assert;
+import org.mule.tck.junit4.rule.SystemProperty;
 import org.mule.util.SerializationUtils;
+import org.mule.util.concurrent.Latch;
+import org.mule.util.store.MuleServerEntryLocker;
 
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class IdempotentRedeliveryPolicyTestCase extends AbstractMuleTestCase
 {
 
     public static final String STRING_MESSAGE = "message";
-    public static final int MAX_REDELIVERY_COUNT = 1;
+    public static final int MAX_REDELIVERY_COUNT = 0;
     private MuleContext mockMuleContext = mock(MuleContext.class, Answers.RETURNS_DEEP_STUBS.get());
     private ObjectStoreManager mockObjectStoreManager = mock(ObjectStoreManager.class, Answers.RETURNS_DEEP_STUBS.get());
     private MessageProcessor mockFailingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
-    private MessageProcessorFilterPair mockDlqMessageProcessor = mock(MessageProcessorFilterPair.class, Answers.RETURNS_DEEP_STUBS.get());
+    private MessageProcessor mockWaitingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
+    private MessageProcessor mockDlqMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
     private MuleMessage message = mock(MuleMessage.class, Answers.RETURNS_DEEP_STUBS.get());
     private MuleEvent event = mock(MuleEvent.class, Answers.RETURNS_DEEP_STUBS.get());
+    private Latch waitLatch = new Latch();
+    private CountDownLatch waitingMessageProcessorExecutionLatch = new CountDownLatch(2);
+    private final IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
 
+    @Rule
+    public SystemProperty systemProperty = new SystemProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+
     @Before
     public void setUpTest() throws MuleException
     {
         when(mockFailingMessageProcessor.process(any(MuleEvent.class))).thenThrow(new RuntimeException("failing"));
-        System.setProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+        when(mockWaitingMessageProcessor.process(event)).thenAnswer(new Answer<MuleEvent>()
+        {
+            @Override
+            public MuleEvent answer(InvocationOnMock invocationOnMock) throws Throwable
+            {
+                waitingMessageProcessorExecutionLatch.countDown();
+                waitLatch.await(2000, TimeUnit.MILLISECONDS);
+                return mockFailingMessageProcessor.process((MuleEvent) invocationOnMock.getArguments()[0]);
+            }
+        });
+        when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
+        InMemoryObjectStore inMemoryObjectStore = new InMemoryObjectStore();
+        when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(inMemoryObjectStore);
+        when(mockObjectStoreManager.getLockableObjectStore(inMemoryObjectStore)).thenReturn(inMemoryObjectStore);
+        when(event.getMessage()).thenReturn(message);
+        irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
+        irp.setUseSecureHash(true);
+        irp.setFlowConstruct(mock(FlowConstruct.class));
+        irp.setMuleContext(mockMuleContext);
+        irp.setListener(mockFailingMessageProcessor);
+        irp.setMessageProcessor(mockDlqMessageProcessor);
+
     }
 
     @Test
     public void messageDigestFailure() throws Exception
     {
-        Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
-        Mockito.when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(new InMemoryObjectStore());
-
-        IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
-        irp.setUseSecureHash(true);
-        irp.setMaxRedeliveryCount(1);
-        irp.setFlowConstruct(mock(FlowConstruct.class));
-        irp.setMuleContext(mockMuleContext);
+        when(message.getPayload()).thenReturn(new Object());
         irp.initialise();
-
-
-        when(message.getPayload()).thenReturn(new Object());
-
-        when(event.getMessage()).thenReturn(message);
         MuleEvent process = irp.process(event);
         Assert.assertNull(process);
     }
@@ -82,22 +108,43 @@
     @Test
     public void testMessageRedeliveryUsingMemory() throws Exception
     {
-        Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
-        Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new InMemoryObjectStore());
+        when(message.getPayload()).thenReturn(STRING_MESSAGE);
+        irp.initialise();
+        processUntilFailure();
+        verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+    }
 
-        IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
-        irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
-        irp.setUseSecureHash(true);
-        irp.setFlowConstruct(mock(FlowConstruct.class));
-        irp.setMuleContext(mockMuleContext);
-        irp.setListener(mockFailingMessageProcessor);
-        irp.setDeadLetterQueue(mockDlqMessageProcessor);
+    @Test
+    public void testMessageRedeliveryUsingSerializationStore() throws Exception
+    {
+        when(message.getPayload()).thenReturn(STRING_MESSAGE);
+        when(mockObjectStoreManager.getLockableObjectStore(any(ObjectStore.class))).thenReturn(new SerializationObjectStore());
         irp.initialise();
+        processUntilFailure();
+        verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+    }
 
+    @Test
+    public void testThreadSafeObjectStoreUsage() throws Exception
+    {
         when(message.getPayload()).thenReturn(STRING_MESSAGE);
-        when(event.getMessage()).thenReturn(message);
+        irp.setListener(mockWaitingMessageProcessor);
+        irp.initialise();
 
-        for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)
+        ExecuteIrpThread firstIrpExecutionThread = new ExecuteIrpThread();
+        firstIrpExecutionThread.start();
+        ExecuteIrpThread threadCausingRedeliveryException = new ExecuteIrpThread();
+        threadCausingRedeliveryException.start();
+        waitingMessageProcessorExecutionLatch.await(5000, TimeUnit.MILLISECONDS);
+        waitLatch.release();
+        firstIrpExecutionThread.join();
+        threadCausingRedeliveryException.join();
+        verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+    }
+
+    private void processUntilFailure()
+    {
+        for (int i = 0; i < MAX_REDELIVERY_COUNT + 2; i++)
         {
             try
             {
@@ -107,28 +154,14 @@
             {
             }
         }
-        verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1));
     }
 
-    @Test
-    public void testMessageRedeliveryUsingSerializationStore() throws Exception
+    public class ExecuteIrpThread extends Thread
     {
-        Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
-        Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new SerializationObjectStore());
-
-        IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
-        irp.setUseSecureHash(true);
-        irp.setMaxRedeliveryCount(1);
-        irp.setFlowConstruct(mock(FlowConstruct.class));
-        irp.setMuleContext(mockMuleContext);
-        irp.setListener(mockFailingMessageProcessor);
-        irp.setDeadLetterQueue(mockDlqMessageProcessor);
-        irp.initialise();
-
-        when(message.getPayload()).thenReturn(STRING_MESSAGE);
-        when(event.getMessage()).thenReturn(message);
-
-        for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)
+        public Exception exception;
+        
+        @Override
+        public void run()
         {
             try
             {
@@ -136,15 +169,18 @@
             }
             catch (Exception e)
             {
+                exception = e;
             }
         }
-        verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1));
     }
+   
+    
 
-    public static class SerializationObjectStore implements ObjectStore<AtomicInteger>
+    public static class SerializationObjectStore implements LockableObjectStore<AtomicInteger>
     {
 
         private Map<Serializable,Serializable> store = new HashMap<Serializable,Serializable>();
+        private MuleServerEntryLocker lockableObjectStore = new MuleServerEntryLocker();
 
         @Override
         public boolean contains(Serializable key) throws ObjectStoreException
@@ -177,11 +213,24 @@
         {
             return false;
         }
+
+        @Override
+        public void lockEntry(Serializable key)
+        {
+            lockableObjectStore.lock(key);
+        }
+
+        @Override
+        public void releaseEntry(Serializable key)
+        {
+            lockableObjectStore.release(key);
+        }
     }
 
-    public static class InMemoryObjectStore implements ObjectStore<AtomicInteger>
+    public static class InMemoryObjectStore implements LockableObjectStore<AtomicInteger>
     {
         private Map<Serializable,AtomicInteger> store = new HashMap<Serializable,AtomicInteger>();
+        private MuleServerEntryLocker lockableObjectStore = new MuleServerEntryLocker();
 
         @Override
         public boolean contains(Serializable key) throws ObjectStoreException
@@ -212,6 +261,18 @@
         {
             return false;
         }
+
+        @Override
+        public void lockEntry(Serializable key)
+        {
+            lockableObjectStore.lock(key);
+        }
+
+        @Override
+        public void releaseEntry(Serializable key)
+        {
+            lockableObjectStore.release(key);
+        }
     }
 
 }

Added: branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java (0 => 24766)


--- branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -0,0 +1,90 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.tck.junit4.rule;
+
+import org.junit.rules.ExternalResource;
+
+/**
+ * Sets up a system property before a test and guaranties to tear it down
+ * afterward.
+ */
+public class SystemProperty extends ExternalResource
+{
+
+    private final String name;
+    private String value;
+    private boolean initialized;
+    private String oldValue;
+
+    public SystemProperty(String name)
+    {
+        this(name, null);
+    }
+
+    public SystemProperty(String name, String value)
+    {
+        this.name = name;
+        this.value = value;
+    }
+
+    @Override
+    protected void before() throws Throwable
+    {
+        if (initialized)
+        {
+            throw new IllegalArgumentException("System property was already initialized");
+        }
+
+        oldValue = System.setProperty(name, getValue());
+        initialized = true;
+    }
+
+    @Override
+    protected void after()
+    {
+        if (!initialized)
+        {
+            throw new IllegalArgumentException("System property was not initialized");
+        }
+
+        doCleanUp();
+        restoreOldValue();
+
+        initialized = false;
+    }
+
+    protected void restoreOldValue()
+    {
+        if (oldValue == null)
+        {
+            System.clearProperty(name);
+        }
+        else
+        {
+            System.setProperty(name, oldValue);
+        }
+    }
+
+    public String getName()
+    {
+        return name;
+    }
+
+    protected void doCleanUp()
+    {
+        // Nothing to do
+    };
+
+    public String getValue()
+    {
+        return value;
+    };
+}
Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.2.x/core/src/test/java/org/mule/util/store/MuleServerEntryLockerTestCase.java (0 => 24766)


--- branches/mule-3.2.x/core/src/test/java/org/mule/util/store/MuleServerEntryLockerTestCase.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/util/store/MuleServerEntryLockerTestCase.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -0,0 +1,148 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.store;
+
+import org.junit.Test;
+import org.mule.api.store.ObjectAlreadyExistsException;
+import org.mule.api.store.ObjectStore;
+import org.mule.api.store.ObjectStoreException;
+import org.mule.config.i18n.CoreMessages;
+import org.mule.util.concurrent.Latch;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class MuleServerEntryLockerTestCase
+{
+    public static final int THREAD_COUNT = 100;
+    public static final int ITERATIONS_PER_THREAD = 1000;
+    private String sharedKeyA = "A";
+    private String sharedKeyB = "B";
+    private MuleServerEntryLocker lockableObjectStore = new MuleServerEntryLocker();
+    private InMemoryObjectStore objectStore  = new InMemoryObjectStore();
+    private Latch threadStartLatch = new Latch();
+
+
+    
+    @Test
+    public void testHighConcurrency() throws Exception
+    {
+        List<Thread> threads = new ArrayList<Thread>(THREAD_COUNT);
+        for (int i = 0; i < THREAD_COUNT; i++)
+        {
+            IncrementKeyValueThread incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyA);
+            threads.add(incrementKeyValueThread);
+            incrementKeyValueThread.start();
+            incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyB);
+            threads.add(incrementKeyValueThread);
+            incrementKeyValueThread.start();
+        }
+        threadStartLatch.release();
+        for (Thread thread : threads)
+        {
+            thread.join();
+        }
+        assertThat(objectStore.retrieve(sharedKeyA), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+        assertThat(objectStore.retrieve(sharedKeyB), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+    }
+    
+    public class IncrementKeyValueThread extends Thread
+    {
+        private String key;
+
+        public IncrementKeyValueThread(String key)
+        {
+            super("Thread-" + key);
+            this.key = key;
+        }
+
+        @Override
+        public void run()
+        {
+            try
+            {
+                threadStartLatch.await(5000, TimeUnit.MILLISECONDS);
+                for (int i = 0; i < ITERATIONS_PER_THREAD; i ++)
+                {
+                    try
+                    {
+                        lockableObjectStore.lock(key);
+                        Integer value;
+                        if (objectStore.contains(key))
+                        {
+                            value = objectStore.retrieve(key);
+                            objectStore.remove(key);
+                        }
+                        else
+                        {
+                            value = 0;
+                        }
+                        objectStore.store(key,value + 1);
+                    }
+                    finally
+                    {
+                        lockableObjectStore.release(key);
+                    }
+                }
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public static class InMemoryObjectStore implements ObjectStore<Integer>
+    {
+        private Map<Serializable,Integer> store = new HashMap<Serializable,Integer>();
+
+        @Override
+        public boolean contains(Serializable key) throws ObjectStoreException
+        {
+            return store.containsKey(key);
+        }
+
+        @Override
+        public void store(Serializable key, Integer value) throws ObjectStoreException
+        {
+            if (store.containsKey(key))
+            {
+                throw new ObjectAlreadyExistsException(CoreMessages.createStaticMessage(""));
+            }
+            store.put(key,value);
+        }
+
+        @Override
+        public Integer retrieve(Serializable key) throws ObjectStoreException
+        {
+            return store.get(key);
+        }
+
+        @Override
+        public Integer remove(Serializable key) throws ObjectStoreException
+        {
+            return store.remove(key);
+        }
+
+        @Override
+        public boolean isPersistent()
+        {
+            return false;
+        }
+    }
+
+}
Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/util/store/MuleServerEntryLockerTestCase.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style


To unsubscribe from this list please visit:

http://xircles.codehaus.org/manage_email





To start a new topic under Mule - Dev, email [hidden email]
To unsubscribe from Mule - Dev, click here.
NAML

Reply | Threaded
Open this post in threaded view
|

Re: [mule-scm] [mule][24766] branches/mule-3.2.x/core/src: MULE-6403 - creating a locking mechanism for object store.

Pablo Kraan
In reply to this post by Pablo Kraan
Hi Pablo LG,

The added test needs to be reviewed ASAP: it broke http://bamboo.mulesoft.org/browse/MULE33X-JDK6-202/ and hung

Pablo K

On Sat, Aug 18, 2012 at 7:59 PM, Pablo Kraan <[hidden email]> wrote:

Hi Pablo,

Here you have my comment on this revision: big commit -> big review


LockableObjectStore: not sure the name is right, you are not locking the OS but only a given key.

I would use lock(Serializable key) and unlock(Serializable key) instead of lockEntry and releaseEntry.  (or acquire/release)


LockEntry's javadoc is wrong on this part "read/update this key". Should say read/remove this key as there is no update operation in a OS.

What happens when there is no object in the store with that key? That must be also defined in the javadoc.


RelaseEntry: what happens if the key is not locked or is locked on a different thread? that info should be on the javadoc.


DefaultLockableObjectStore looks more like a LockableObjectStoreAdapter, ie, a way to convert a standard OS into a lockable one.

DefaultLockableObjectStore must lock/release each key before operating on it, otherwise the lock mechanism is useless.

The usage of the "this" keyword is redundant in many places.


MuleEntryLocker: name must be more generic because this interface can be used to lock anything, not just "entries.

Javadoc must be improved to define the contract more explicitly.

After seeing this interface Is not clear to me why you need to define a LockableObjectStore if you can just implement MuleEntryLocker in DefaultLockableObjectStore


MuleServerEntryLocker:

acquireLock must be final to avoid possible bugs.

Remove the redundant usage of "this"

The lock/release schema seems weak to me: any call to release will work without checking if the caller is the thread that holds the lock. I would expect this methods work similar to how the lock/unlock methods work on ReentrantLock.


I think there are a couple of race conditions on MuleServerEntryLockerTestCase when the key is not in the store:

1) The worker thread does lockableObjectStore.lock(key); and then it does a lockableObjectStore.release(key); on the finally block. In this case there will be an exception when the thread did not locked the key (what not in the store) but another thread locked it.

2) The worker thread does objectStore.contains(key)  and then objectStore.store(key,value + 1);. In this case there will be an exception when the object was already added in a different thread


Seems to me that the API must be extended in some way, maybe the lock method should create a lock even when the key is not there in order to avoid the race condition.


Pablo


On Fri, Aug 17, 2012 at 4:04 PM, <[hidden email]> wrote:
Revision
24766
Author
pablo.lagreca
Date
2012-08-17 14:04:00 -0500 (Fri, 17 Aug 2012)

Log Message

MULE-6403 - creating a locking mechanism for object store. Adding locking mechanism to IdempotentRedeliveryPolicy in order to avoid concurrency issues

Modified Paths

Added Paths

Diff

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java (24765 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java	2012-08-17 18:26:46 UTC (rev 24765)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -130,6 +130,7 @@
     public static final String OBJECT_QUEUE_MANAGER = "_muleQueueManager";
     public static final String OBJECT_STORE_DEFAULT_IN_MEMORY_NAME = "_defaultInMemoryObjectStore";
     public static final String OBJECT_STORE_DEFAULT_PERSISTENT_NAME = "_defaultPersistentObjectStore";
+    public static final String OBJECT_STORE_DEFAULT_LOCKER = "_defaultObjectStoreLocker";
     public static final String QUEUE_STORE_DEFAULT_IN_MEMORY_NAME = "_defaultInMemoryQueueStore";
     public static final String QUEUE_STORE_DEFAULT_PERSISTENT_NAME = "_defaultPersistentQueueStore";
     public static final String DEFAULT_USER_OBJECT_STORE_NAME = "_defaultUserObjectStore";

Added: branches/mule-3.2.x/core/src/main/java/org/mule/api/store/LockableObjectStore.java (0 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/api/store/LockableObjectStore.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/api/store/LockableObjectStore.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -0,0 +1,26 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.api.store;
+
+import java.io.Serializable;
+
+public interface LockableObjectStore<T extends Serializable> extends ObjectStore<T>
+{
+    /**
+     * Locks a key in the store so no other thread can read/update this key until is released.
+     */
+    void lockEntry(Serializable key);
+
+    /**
+     * Unlock a key in the store so other threads can access the entry associated with the key.
+     */
+    void releaseEntry(Serializable key);
+
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/api/store/LockableObjectStore.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/api/store/ObjectStoreManager.java (24765 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/api/store/ObjectStoreManager.java	2012-08-17 18:26:46 UTC (rev 24765)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/api/store/ObjectStoreManager.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -37,4 +37,10 @@
      * Delete all objects from the partition
      */
     void disposeStore(ObjectStore<? extends Serializable> store) throws ObjectStoreException;
+
+    /**
+     * Return a LockableObjectStore using the mule default locker and the provided object store.
+     */
+    <T extends LockableObjectStore<? extends Serializable>> T getLockableObjectStore(ObjectStore<? extends Serializable> objectStore);
+
 }

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java (24765 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java	2012-08-17 18:26:46 UTC (rev 24765)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -49,7 +49,7 @@
     @Override
     public void initialise() throws InitialisationException
     {
-        if (maxRedeliveryCount < 1)
+        if (maxRedeliveryCount < 0)
         {
             throw new InitialisationException(
                 CoreMessages.initialisationFailure(

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java (24765 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java	2012-08-17 18:26:46 UTC (rev 24765)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -16,7 +16,7 @@
 import org.mule.api.lifecycle.InitialisationException;
 import org.mule.api.lifecycle.Startable;
 import org.mule.api.processor.MessageProcessor;
-import org.mule.api.store.ObjectStore;
+import org.mule.api.store.LockableObjectStore;
 import org.mule.api.store.ObjectStoreException;
 import org.mule.api.store.ObjectStoreManager;
 import org.mule.api.transformer.TransformerException;
@@ -41,6 +41,11 @@
  */
 public class IdempotentRedeliveryPolicy extends AbstractRedeliveryPolicy
 {
+    private static final boolean OBJECT_STORE_NO_PERSISTENCE = false;
+    private static final int OBJECT_STORE_NO_ENTRY_LIMIT = -1;
+    private static final int OBJECT_STORE_FIVE_MINUTES_TTL = 60 * 5 * 1000;
+    private static final int OBJECT_STORE_EXPIRATION_INTERVAL = 6000;
+
     private final ObjectToByteArray objectToByteArray = new ObjectToByteArray();
     private final ByteArrayToHexString byteArrayToHexString = new ByteArrayToHexString();
 
@@ -49,7 +54,7 @@
     private boolean useSecureHash;
     private String messageDigestAlgorithm;
     private String idExpression;
-    private ObjectStore<AtomicInteger> store;
+    private LockableObjectStore<AtomicInteger> store;
 
     @Override
     public void initialise() throws InitialisationException
@@ -98,11 +103,12 @@
         store = createStore();
     }
 
-    private ObjectStore<AtomicInteger> createStore() throws InitialisationException
+    private LockableObjectStore<AtomicInteger> createStore() throws InitialisationException
     {
         ObjectStoreManager objectStoreManager = (ObjectStoreManager) muleContext.getRegistry().get(
                                 MuleProperties.OBJECT_STORE_MANAGER);
-        return objectStoreManager.getObjectStore(flowConstruct.getName() + "." + getClass().getName(), false, -1,  60 * 5 * 1000, 6000 );
+        return objectStoreManager.getLockableObjectStore(objectStoreManager.getObjectStore(flowConstruct.getName() + "." + getClass().getName(),
+                OBJECT_STORE_NO_PERSISTENCE, OBJECT_STORE_NO_ENTRY_LIMIT, OBJECT_STORE_FIVE_MINUTES_TTL, OBJECT_STORE_EXPIRATION_INTERVAL));
     }
 
 
@@ -165,45 +171,55 @@
             exceptionSeen = true;
         }
 
-        if (!exceptionSeen)
+        try
         {
-            counter = findCounter(messageId);
-            tooMany = counter != null && counter.get() > maxRedeliveryCount;
-        }
+            store.lockEntry(messageId);
 
-        if (tooMany || exceptionSeen)
-        {
-            try
+            if (!exceptionSeen)
             {
-                return deadLetterQueue.process(event);
+                counter = findCounter(messageId);
+                tooMany = counter != null && counter.get() > maxRedeliveryCount;
             }
-            catch (Exception ex)
-            {
-                logger.info("Exception thrown from failed message processing for message " + messageId, ex);
-            }
-            return null;
-        }
 
-        try
-        {
-            MuleEvent returnEvent = processNext(event);
-            counter = findCounter(messageId);
-            if (counter != null)
-            {
-                resetCounter(messageId);
-            }
-            return returnEvent;
+            if (tooMany || exceptionSeen)
+                {
+                    try
+                    {
+                        return deadLetterQueue.process(event);
+                    }
+                    catch (Exception ex)
+                    {
+                        logger.info("Exception thrown from failed message processing for message " + messageId, ex);
+                    }
+                    return null;
+                }
+        
+                try
+                {
+                    MuleEvent returnEvent = processNext(event);
+                    counter = findCounter(messageId);
+                    if (counter != null)
+                    {
+                        resetCounter(messageId);
+                    }
+                    return returnEvent;
+                }
+                catch (MuleException ex)
+                {
+                    incrementCounter(messageId);
+                    throw ex;
+                }
+                catch (RuntimeException ex)
+                {
+                    incrementCounter(messageId);
+                    throw ex;
+                }
         }
-        catch (MuleException ex)
+        finally 
         {
-            incrementCounter(messageId);
-            throw ex;
+            store.releaseEntry(messageId);
         }
-        catch (RuntimeException ex)
-        {
-            incrementCounter(messageId);
-            throw ex;
-        }
+        
     }
 
     private void resetCounter(String messageId) throws ObjectStoreException

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/store/DefaultLockableObjectStore.java (0 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/store/DefaultLockableObjectStore.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/store/DefaultLockableObjectStore.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -0,0 +1,70 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.store;
+
+import org.mule.api.store.LockableObjectStore;
+import org.mule.api.store.ObjectStore;
+import org.mule.api.store.ObjectStoreException;
+
+import java.io.Serializable;
+
+public class DefaultLockableObjectStore<T extends Serializable> implements LockableObjectStore<T>
+{
+    private ObjectStore<T> objectStore;
+    private MuleEntryLocker entryLocker;
+
+    public DefaultLockableObjectStore(ObjectStore<T> objectStore, MuleEntryLocker muleEntryLocker)
+    {
+        this.objectStore = objectStore;
+        this.entryLocker = muleEntryLocker;
+    }
+
+    @Override
+    public void lockEntry(Serializable key)
+    {
+        this.entryLocker.lock(key);
+    }
+
+    @Override
+    public void releaseEntry(Serializable key)
+    {
+        this.entryLocker.release(key);
+    }
+
+    @Override
+    public boolean contains(Serializable key) throws ObjectStoreException
+    {
+        return objectStore.contains(key);
+    }
+
+    @Override
+    public void store(Serializable key, T value) throws ObjectStoreException
+    {
+        objectStore.store(key,value);
+    }
+
+    @Override
+    public T retrieve(Serializable key) throws ObjectStoreException
+    {
+        return objectStore.retrieve(key);
+    }
+
+    @Override
+    public T remove(Serializable key) throws ObjectStoreException
+    {
+        return objectStore.remove(key);
+    }
+
+    @Override
+    public boolean isPersistent()
+    {
+        return objectStore.isPersistent();
+    }
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/store/DefaultLockableObjectStore.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleEntryLocker.java (0 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleEntryLocker.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleEntryLocker.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -0,0 +1,29 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.store;
+
+/**
+ *
+ * Provides a lock mechanism for share information in Mule.
+ *
+ */
+public interface MuleEntryLocker<T>
+{
+    /**
+     * Creates a lock around a lockIdentifier.
+     * To release lock use release method with the same identifier
+     */
+    public void lock(T lockIdentifier);
+
+    /**
+     *  Releases a lock previously locked.
+     */
+    public void release(T lockIdentifier);
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleEntryLocker.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleObjectStoreManager.java (24765 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleObjectStoreManager.java	2012-08-17 18:26:46 UTC (rev 24765)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleObjectStoreManager.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -10,6 +10,8 @@
 
 package org.mule.util.store;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.mule.api.MuleContext;
 import org.mule.api.MuleRuntimeException;
 import org.mule.api.config.MuleProperties;
@@ -18,6 +20,7 @@
 import org.mule.api.lifecycle.Initialisable;
 import org.mule.api.lifecycle.InitialisationException;
 import org.mule.api.store.ListableObjectStore;
+import org.mule.api.store.LockableObjectStore;
 import org.mule.api.store.ObjectStore;
 import org.mule.api.store.ObjectStoreException;
 import org.mule.api.store.ObjectStoreManager;
@@ -32,9 +35,6 @@
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 public class MuleObjectStoreManager
     implements ObjectStoreManager, MuleContextAware, Initialisable, Disposable
 {
@@ -259,4 +259,20 @@
             }
         }
     }
+
+    @Override
+    public <T extends LockableObjectStore<? extends Serializable>> T getLockableObjectStore(ObjectStore<? extends Serializable> objectStore)
+    {
+        LockableObjectStore lockableObjectStore;
+        if (objectStore instanceof LockableObjectStore)
+        {
+            lockableObjectStore = (LockableObjectStore) objectStore;
+        }
+        else
+        {
+            MuleEntryLocker muleEntryLocker = muleContext.getRegistry().get(MuleProperties.OBJECT_STORE_DEFAULT_LOCKER);
+            lockableObjectStore = new DefaultLockableObjectStore(objectStore, muleEntryLocker);
+        }
+        return (T) lockableObjectStore;
+    }
 }

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleServerEntryLocker.java (0 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleServerEntryLocker.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleServerEntryLocker.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -0,0 +1,60 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.store;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class MuleServerEntryLocker implements MuleEntryLocker<Serializable>
+{
+    private Map<Serializable, ReentrantLock> locks;
+    private Object acquireLock = new Object();
+
+    public MuleServerEntryLocker()
+    {
+        this.locks = new HashMap<Serializable,ReentrantLock>();
+    }
+
+    public void lock(Serializable key)
+    {
+        ReentrantLock lock;
+        synchronized (acquireLock)
+        {
+            if (this.locks.containsKey(key))
+            {
+                lock = this.locks.get(key);
+            }
+            else
+            {
+                lock = new ReentrantLock(true);
+                this.locks.put(key,lock);
+            }
+        }
+        lock.lock();
+    }
+
+    public void release(Serializable key)
+    {
+        synchronized (acquireLock)
+        {
+            ReentrantLock reentrantLock = this.locks.get(key);
+            if (reentrantLock != null)
+            {
+                if (!reentrantLock.hasQueuedThreads())
+                {
+                    this.locks.remove(key);
+                }
+                reentrantLock.unlock();
+            }
+        }
+    }
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleServerEntryLocker.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Modified: branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java (24765 => 24766)


--- branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java	2012-08-17 18:26:46 UTC (rev 24765)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -10,71 +10,97 @@
 
 package org.mule.processor;
 
-import static org.mockito.Matchers.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-
+import junit.framework.Assert;
 import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
 import org.mockito.Answers;
-import org.mockito.Mockito;
 import org.mockito.internal.verification.VerificationModeFactory;
-import org.mule.api.*;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mule.api.MuleContext;
+import org.mule.api.MuleEvent;
+import org.mule.api.MuleException;
+import org.mule.api.MuleMessage;
 import org.mule.api.config.MuleProperties;
 import org.mule.api.construct.FlowConstruct;
 import org.mule.api.processor.MessageProcessor;
+import org.mule.api.store.LockableObjectStore;
 import org.mule.api.store.ObjectStore;
 import org.mule.api.store.ObjectStoreException;
 import org.mule.api.store.ObjectStoreManager;
-import org.mule.routing.MessageProcessorFilterPair;
 import org.mule.tck.junit4.AbstractMuleTestCase;
-
-import org.junit.Test;
-
-import junit.framework.Assert;
+import org.mule.tck.junit4.rule.SystemProperty;
 import org.mule.util.SerializationUtils;
+import org.mule.util.concurrent.Latch;
+import org.mule.util.store.MuleServerEntryLocker;
 
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class IdempotentRedeliveryPolicyTestCase extends AbstractMuleTestCase
 {
 
     public static final String STRING_MESSAGE = "message";
-    public static final int MAX_REDELIVERY_COUNT = 1;
+    public static final int MAX_REDELIVERY_COUNT = 0;
     private MuleContext mockMuleContext = mock(MuleContext.class, Answers.RETURNS_DEEP_STUBS.get());
     private ObjectStoreManager mockObjectStoreManager = mock(ObjectStoreManager.class, Answers.RETURNS_DEEP_STUBS.get());
     private MessageProcessor mockFailingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
-    private MessageProcessorFilterPair mockDlqMessageProcessor = mock(MessageProcessorFilterPair.class, Answers.RETURNS_DEEP_STUBS.get());
+    private MessageProcessor mockWaitingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
+    private MessageProcessor mockDlqMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
     private MuleMessage message = mock(MuleMessage.class, Answers.RETURNS_DEEP_STUBS.get());
     private MuleEvent event = mock(MuleEvent.class, Answers.RETURNS_DEEP_STUBS.get());
+    private Latch waitLatch = new Latch();
+    private CountDownLatch waitingMessageProcessorExecutionLatch = new CountDownLatch(2);
+    private final IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
 
+    @Rule
+    public SystemProperty systemProperty = new SystemProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+
     @Before
     public void setUpTest() throws MuleException
     {
         when(mockFailingMessageProcessor.process(any(MuleEvent.class))).thenThrow(new RuntimeException("failing"));
-        System.setProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+        when(mockWaitingMessageProcessor.process(event)).thenAnswer(new Answer<MuleEvent>()
+        {
+            @Override
+            public MuleEvent answer(InvocationOnMock invocationOnMock) throws Throwable
+            {
+                waitingMessageProcessorExecutionLatch.countDown();
+                waitLatch.await(2000, TimeUnit.MILLISECONDS);
+                return mockFailingMessageProcessor.process((MuleEvent) invocationOnMock.getArguments()[0]);
+            }
+        });
+        when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
+        InMemoryObjectStore inMemoryObjectStore = new InMemoryObjectStore();
+        when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(inMemoryObjectStore);
+        when(mockObjectStoreManager.getLockableObjectStore(inMemoryObjectStore)).thenReturn(inMemoryObjectStore);
+        when(event.getMessage()).thenReturn(message);
+        irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
+        irp.setUseSecureHash(true);
+        irp.setFlowConstruct(mock(FlowConstruct.class));
+        irp.setMuleContext(mockMuleContext);
+        irp.setListener(mockFailingMessageProcessor);
+        irp.setMessageProcessor(mockDlqMessageProcessor);
+
     }
 
     @Test
     public void messageDigestFailure() throws Exception
     {
-        Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
-        Mockito.when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(new InMemoryObjectStore());
-
-        IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
-        irp.setUseSecureHash(true);
-        irp.setMaxRedeliveryCount(1);
-        irp.setFlowConstruct(mock(FlowConstruct.class));
-        irp.setMuleContext(mockMuleContext);
+        when(message.getPayload()).thenReturn(new Object());
         irp.initialise();
-
-
-        when(message.getPayload()).thenReturn(new Object());
-
-        when(event.getMessage()).thenReturn(message);
         MuleEvent process = irp.process(event);
         Assert.assertNull(process);
     }
@@ -82,22 +108,43 @@
     @Test
     public void testMessageRedeliveryUsingMemory() throws Exception
     {
-        Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
-        Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new InMemoryObjectStore());
+        when(message.getPayload()).thenReturn(STRING_MESSAGE);
+        irp.initialise();
+        processUntilFailure();
+        verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+    }
 
-        IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
-        irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
-        irp.setUseSecureHash(true);
-        irp.setFlowConstruct(mock(FlowConstruct.class));
-        irp.setMuleContext(mockMuleContext);
-        irp.setListener(mockFailingMessageProcessor);
-        irp.setDeadLetterQueue(mockDlqMessageProcessor);
+    @Test
+    public void testMessageRedeliveryUsingSerializationStore() throws Exception
+    {
+        when(message.getPayload()).thenReturn(STRING_MESSAGE);
+        when(mockObjectStoreManager.getLockableObjectStore(any(ObjectStore.class))).thenReturn(new SerializationObjectStore());
         irp.initialise();
+        processUntilFailure();
+        verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+    }
 
+    @Test
+    public void testThreadSafeObjectStoreUsage() throws Exception
+    {
         when(message.getPayload()).thenReturn(STRING_MESSAGE);
-        when(event.getMessage()).thenReturn(message);
+        irp.setListener(mockWaitingMessageProcessor);
+        irp.initialise();
 
-        for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)
+        ExecuteIrpThread firstIrpExecutionThread = new ExecuteIrpThread();
+        firstIrpExecutionThread.start();
+        ExecuteIrpThread threadCausingRedeliveryException = new ExecuteIrpThread();
+        threadCausingRedeliveryException.start();
+        waitingMessageProcessorExecutionLatch.await(5000, TimeUnit.MILLISECONDS);
+        waitLatch.release();
+        firstIrpExecutionThread.join();
+        threadCausingRedeliveryException.join();
+        verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+    }
+
+    private void processUntilFailure()
+    {
+        for (int i = 0; i < MAX_REDELIVERY_COUNT + 2; i++)
         {
             try
             {
@@ -107,28 +154,14 @@
             {
             }
         }
-        verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1));
     }
 
-    @Test
-    public void testMessageRedeliveryUsingSerializationStore() throws Exception
+    public class ExecuteIrpThread extends Thread
     {
-        Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
-        Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new SerializationObjectStore());
-
-        IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
-        irp.setUseSecureHash(true);
-        irp.setMaxRedeliveryCount(1);
-        irp.setFlowConstruct(mock(FlowConstruct.class));
-        irp.setMuleContext(mockMuleContext);
-        irp.setListener(mockFailingMessageProcessor);
-        irp.setDeadLetterQueue(mockDlqMessageProcessor);
-        irp.initialise();
-
-        when(message.getPayload()).thenReturn(STRING_MESSAGE);
-        when(event.getMessage()).thenReturn(message);
-
-        for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)
+        public Exception exception;
+        
+        @Override
+        public void run()
         {
             try
             {
@@ -136,15 +169,18 @@
             }
             catch (Exception e)
             {
+                exception = e;
             }
         }
-        verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1));
     }
+   
+    
 
-    public static class SerializationObjectStore implements ObjectStore<AtomicInteger>
+    public static class SerializationObjectStore implements LockableObjectStore<AtomicInteger>
     {
 
         private Map<Serializable,Serializable> store = new HashMap<Serializable,Serializable>();
+        private MuleServerEntryLocker lockableObjectStore = new MuleServerEntryLocker();
 
         @Override
         public boolean contains(Serializable key) throws ObjectStoreException
@@ -177,11 +213,24 @@
         {
             return false;
         }
+
+        @Override
+        public void lockEntry(Serializable key)
+        {
+            lockableObjectStore.lock(key);
+        }
+
+        @Override
+        public void releaseEntry(Serializable key)
+        {
+            lockableObjectStore.release(key);
+        }
     }
 
-    public static class InMemoryObjectStore implements ObjectStore<AtomicInteger>
+    public static class InMemoryObjectStore implements LockableObjectStore<AtomicInteger>
     {
         private Map<Serializable,AtomicInteger> store = new HashMap<Serializable,AtomicInteger>();
+        private MuleServerEntryLocker lockableObjectStore = new MuleServerEntryLocker();
 
         @Override
         public boolean contains(Serializable key) throws ObjectStoreException
@@ -212,6 +261,18 @@
         {
             return false;
         }
+
+        @Override
+        public void lockEntry(Serializable key)
+        {
+            lockableObjectStore.lock(key);
+        }
+
+        @Override
+        public void releaseEntry(Serializable key)
+        {
+            lockableObjectStore.release(key);
+        }
     }
 
 }

Added: branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java (0 => 24766)


--- branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -0,0 +1,90 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.tck.junit4.rule;
+
+import org.junit.rules.ExternalResource;
+
+/**
+ * Sets up a system property before a test and guaranties to tear it down
+ * afterward.
+ */
+public class SystemProperty extends ExternalResource
+{
+
+    private final String name;
+    private String value;
+    private boolean initialized;
+    private String oldValue;
+
+    public SystemProperty(String name)
+    {
+        this(name, null);
+    }
+
+    public SystemProperty(String name, String value)
+    {
+        this.name = name;
+        this.value = value;
+    }
+
+    @Override
+    protected void before() throws Throwable
+    {
+        if (initialized)
+        {
+            throw new IllegalArgumentException("System property was already initialized");
+        }
+
+        oldValue = System.setProperty(name, getValue());
+        initialized = true;
+    }
+
+    @Override
+    protected void after()
+    {
+        if (!initialized)
+        {
+            throw new IllegalArgumentException("System property was not initialized");
+        }
+
+        doCleanUp();
+        restoreOldValue();
+
+        initialized = false;
+    }
+
+    protected void restoreOldValue()
+    {
+        if (oldValue == null)
+        {
+            System.clearProperty(name);
+        }
+        else
+        {
+            System.setProperty(name, oldValue);
+        }
+    }
+
+    public String getName()
+    {
+        return name;
+    }
+
+    protected void doCleanUp()
+    {
+        // Nothing to do
+    };
+
+    public String getValue()
+    {
+        return value;
+    };
+}
Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.2.x/core/src/test/java/org/mule/util/store/MuleServerEntryLockerTestCase.java (0 => 24766)


--- branches/mule-3.2.x/core/src/test/java/org/mule/util/store/MuleServerEntryLockerTestCase.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/util/store/MuleServerEntryLockerTestCase.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -0,0 +1,148 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.store;
+
+import org.junit.Test;
+import org.mule.api.store.ObjectAlreadyExistsException;
+import org.mule.api.store.ObjectStore;
+import org.mule.api.store.ObjectStoreException;
+import org.mule.config.i18n.CoreMessages;
+import org.mule.util.concurrent.Latch;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class MuleServerEntryLockerTestCase
+{
+    public static final int THREAD_COUNT = 100;
+    public static final int ITERATIONS_PER_THREAD = 1000;
+    private String sharedKeyA = "A";
+    private String sharedKeyB = "B";
+    private MuleServerEntryLocker lockableObjectStore = new MuleServerEntryLocker();
+    private InMemoryObjectStore objectStore  = new InMemoryObjectStore();
+    private Latch threadStartLatch = new Latch();
+
+
+    
+    @Test
+    public void testHighConcurrency() throws Exception
+    {
+        List<Thread> threads = new ArrayList<Thread>(THREAD_COUNT);
+        for (int i = 0; i < THREAD_COUNT; i++)
+        {
+            IncrementKeyValueThread incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyA);
+            threads.add(incrementKeyValueThread);
+            incrementKeyValueThread.start();
+            incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyB);
+            threads.add(incrementKeyValueThread);
+            incrementKeyValueThread.start();
+        }
+        threadStartLatch.release();
+        for (Thread thread : threads)
+        {
+            thread.join();
+        }
+        assertThat(objectStore.retrieve(sharedKeyA), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+        assertThat(objectStore.retrieve(sharedKeyB), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+    }
+    
+    public class IncrementKeyValueThread extends Thread
+    {
+        private String key;
+
+        public IncrementKeyValueThread(String key)
+        {
+            super("Thread-" + key);
+            this.key = key;
+        }
+
+        @Override
+        public void run()
+        {
+            try
+            {
+                threadStartLatch.await(5000, TimeUnit.MILLISECONDS);
+                for (int i = 0; i < ITERATIONS_PER_THREAD; i ++)
+                {
+                    try
+                    {
+                        lockableObjectStore.lock(key);
+                        Integer value;
+                        if (objectStore.contains(key))
+                        {
+                            value = objectStore.retrieve(key);
+                            objectStore.remove(key);
+                        }
+                        else
+                        {
+                            value = 0;
+                        }
+                        objectStore.store(key,value + 1);
+                    }
+                    finally
+                    {
+                        lockableObjectStore.release(key);
+                    }
+                }
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public static class InMemoryObjectStore implements ObjectStore<Integer>
+    {
+        private Map<Serializable,Integer> store = new HashMap<Serializable,Integer>();
+
+        @Override
+        public boolean contains(Serializable key) throws ObjectStoreException
+        {
+            return store.containsKey(key);
+        }
+
+        @Override
+        public void store(Serializable key, Integer value) throws ObjectStoreException
+        {
+            if (store.containsKey(key))
+            {
+                throw new ObjectAlreadyExistsException(CoreMessages.createStaticMessage(""));
+            }
+            store.put(key,value);
+        }
+
+        @Override
+        public Integer retrieve(Serializable key) throws ObjectStoreException
+        {
+            return store.get(key);
+        }
+
+        @Override
+        public Integer remove(Serializable key) throws ObjectStoreException
+        {
+            return store.remove(key);
+        }
+
+        @Override
+        public boolean isPersistent()
+        {
+            return false;
+        }
+    }
+
+}
Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/util/store/MuleServerEntryLockerTestCase.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style


To unsubscribe from this list please visit:

http://xircles.codehaus.org/manage_email



Reply | Threaded
Open this post in threaded view
|

Re: [mule-scm] [mule][24766] branches/mule-3.2.x/core/src: MULE-6403 - creating a locking mechanism for object store.

Pablo La Greca
I'm going to revert all my changes around ObjectStore. For 3.3.1 I will only provide a locking mechanism and use that directly from IdempotentRedeliveryPolicy. 

ObjectStore hierarchy is a mess. I prefer not to add anything related to locking until we refactor ObjectStore implementation.

On Mon, Aug 20, 2012 at 7:21 PM, Pablo Kraan <[hidden email]> wrote:
Hi Pablo LG,

The added test needs to be reviewed ASAP: it broke http://bamboo.mulesoft.org/browse/MULE33X-JDK6-202/ and hung

Pablo K


On Sat, Aug 18, 2012 at 7:59 PM, Pablo Kraan <[hidden email]> wrote:

Hi Pablo,

Here you have my comment on this revision: big commit -> big review


LockableObjectStore: not sure the name is right, you are not locking the OS but only a given key.

I would use lock(Serializable key) and unlock(Serializable key) instead of lockEntry and releaseEntry.  (or acquire/release)


LockEntry's javadoc is wrong on this part "read/update this key". Should say read/remove this key as there is no update operation in a OS.

What happens when there is no object in the store with that key? That must be also defined in the javadoc.


RelaseEntry: what happens if the key is not locked or is locked on a different thread? that info should be on the javadoc.


DefaultLockableObjectStore looks more like a LockableObjectStoreAdapter, ie, a way to convert a standard OS into a lockable one.

DefaultLockableObjectStore must lock/release each key before operating on it, otherwise the lock mechanism is useless.

The usage of the "this" keyword is redundant in many places.


MuleEntryLocker: name must be more generic because this interface can be used to lock anything, not just "entries.

Javadoc must be improved to define the contract more explicitly.

After seeing this interface Is not clear to me why you need to define a LockableObjectStore if you can just implement MuleEntryLocker in DefaultLockableObjectStore


MuleServerEntryLocker:

acquireLock must be final to avoid possible bugs.

Remove the redundant usage of "this"

The lock/release schema seems weak to me: any call to release will work without checking if the caller is the thread that holds the lock. I would expect this methods work similar to how the lock/unlock methods work on ReentrantLock.


I think there are a couple of race conditions on MuleServerEntryLockerTestCase when the key is not in the store:

1) The worker thread does lockableObjectStore.lock(key); and then it does a lockableObjectStore.release(key); on the finally block. In this case there will be an exception when the thread did not locked the key (what not in the store) but another thread locked it.

2) The worker thread does objectStore.contains(key)  and then objectStore.store(key,value + 1);. In this case there will be an exception when the object was already added in a different thread


Seems to me that the API must be extended in some way, maybe the lock method should create a lock even when the key is not there in order to avoid the race condition.


Pablo


On Fri, Aug 17, 2012 at 4:04 PM, <[hidden email]> wrote:
Revision
24766
Author
pablo.lagreca
Date
2012-08-17 14:04:00 -0500 (Fri, 17 Aug 2012)

Log Message

MULE-6403 - creating a locking mechanism for object store. Adding locking mechanism to IdempotentRedeliveryPolicy in order to avoid concurrency issues

Modified Paths

Added Paths

Diff

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java (24765 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java	2012-08-17 18:26:46 UTC (rev 24765)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/api/config/MuleProperties.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -130,6 +130,7 @@
     public static final String OBJECT_QUEUE_MANAGER = "_muleQueueManager";
     public static final String OBJECT_STORE_DEFAULT_IN_MEMORY_NAME = "_defaultInMemoryObjectStore";
     public static final String OBJECT_STORE_DEFAULT_PERSISTENT_NAME = "_defaultPersistentObjectStore";
+    public static final String OBJECT_STORE_DEFAULT_LOCKER = "_defaultObjectStoreLocker";
     public static final String QUEUE_STORE_DEFAULT_IN_MEMORY_NAME = "_defaultInMemoryQueueStore";
     public static final String QUEUE_STORE_DEFAULT_PERSISTENT_NAME = "_defaultPersistentQueueStore";
     public static final String DEFAULT_USER_OBJECT_STORE_NAME = "_defaultUserObjectStore";

Added: branches/mule-3.2.x/core/src/main/java/org/mule/api/store/LockableObjectStore.java (0 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/api/store/LockableObjectStore.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/api/store/LockableObjectStore.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -0,0 +1,26 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.api.store;
+
+import java.io.Serializable;
+
+public interface LockableObjectStore<T extends Serializable> extends ObjectStore<T>
+{
+    /**
+     * Locks a key in the store so no other thread can read/update this key until is released.
+     */
+    void lockEntry(Serializable key);
+
+    /**
+     * Unlock a key in the store so other threads can access the entry associated with the key.
+     */
+    void releaseEntry(Serializable key);
+
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/api/store/LockableObjectStore.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/api/store/ObjectStoreManager.java (24765 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/api/store/ObjectStoreManager.java	2012-08-17 18:26:46 UTC (rev 24765)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/api/store/ObjectStoreManager.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -37,4 +37,10 @@
      * Delete all objects from the partition
      */
     void disposeStore(ObjectStore<? extends Serializable> store) throws ObjectStoreException;
+
+    /**
+     * Return a LockableObjectStore using the mule default locker and the provided object store.
+     */
+    <T extends LockableObjectStore<? extends Serializable>> T getLockableObjectStore(ObjectStore<? extends Serializable> objectStore);
+
 }

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java (24765 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java	2012-08-17 18:26:46 UTC (rev 24765)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/processor/AbstractRedeliveryPolicy.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -49,7 +49,7 @@
     @Override
     public void initialise() throws InitialisationException
     {
-        if (maxRedeliveryCount < 1)
+        if (maxRedeliveryCount < 0)
         {
             throw new InitialisationException(
                 CoreMessages.initialisationFailure(

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java (24765 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java	2012-08-17 18:26:46 UTC (rev 24765)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -16,7 +16,7 @@
 import org.mule.api.lifecycle.InitialisationException;
 import org.mule.api.lifecycle.Startable;
 import org.mule.api.processor.MessageProcessor;
-import org.mule.api.store.ObjectStore;
+import org.mule.api.store.LockableObjectStore;
 import org.mule.api.store.ObjectStoreException;
 import org.mule.api.store.ObjectStoreManager;
 import org.mule.api.transformer.TransformerException;
@@ -41,6 +41,11 @@
  */
 public class IdempotentRedeliveryPolicy extends AbstractRedeliveryPolicy
 {
+    private static final boolean OBJECT_STORE_NO_PERSISTENCE = false;
+    private static final int OBJECT_STORE_NO_ENTRY_LIMIT = -1;
+    private static final int OBJECT_STORE_FIVE_MINUTES_TTL = 60 * 5 * 1000;
+    private static final int OBJECT_STORE_EXPIRATION_INTERVAL = 6000;
+
     private final ObjectToByteArray objectToByteArray = new ObjectToByteArray();
     private final ByteArrayToHexString byteArrayToHexString = new ByteArrayToHexString();
 
@@ -49,7 +54,7 @@
     private boolean useSecureHash;
     private String messageDigestAlgorithm;
     private String idExpression;
-    private ObjectStore<AtomicInteger> store;
+    private LockableObjectStore<AtomicInteger> store;
 
     @Override
     public void initialise() throws InitialisationException
@@ -98,11 +103,12 @@
         store = createStore();
     }
 
-    private ObjectStore<AtomicInteger> createStore() throws InitialisationException
+    private LockableObjectStore<AtomicInteger> createStore() throws InitialisationException
     {
         ObjectStoreManager objectStoreManager = (ObjectStoreManager) muleContext.getRegistry().get(
                                 MuleProperties.OBJECT_STORE_MANAGER);
-        return objectStoreManager.getObjectStore(flowConstruct.getName() + "." + getClass().getName(), false, -1,  60 * 5 * 1000, 6000 );
+        return objectStoreManager.getLockableObjectStore(objectStoreManager.getObjectStore(flowConstruct.getName() + "." + getClass().getName(),
+                OBJECT_STORE_NO_PERSISTENCE, OBJECT_STORE_NO_ENTRY_LIMIT, OBJECT_STORE_FIVE_MINUTES_TTL, OBJECT_STORE_EXPIRATION_INTERVAL));
     }
 
 
@@ -165,45 +171,55 @@
             exceptionSeen = true;
         }
 
-        if (!exceptionSeen)
+        try
         {
-            counter = findCounter(messageId);
-            tooMany = counter != null && counter.get() > maxRedeliveryCount;
-        }
+            store.lockEntry(messageId);
 
-        if (tooMany || exceptionSeen)
-        {
-            try
+            if (!exceptionSeen)
             {
-                return deadLetterQueue.process(event);
+                counter = findCounter(messageId);
+                tooMany = counter != null && counter.get() > maxRedeliveryCount;
             }
-            catch (Exception ex)
-            {
-                logger.info("Exception thrown from failed message processing for message " + messageId, ex);
-            }
-            return null;
-        }
 
-        try
-        {
-            MuleEvent returnEvent = processNext(event);
-            counter = findCounter(messageId);
-            if (counter != null)
-            {
-                resetCounter(messageId);
-            }
-            return returnEvent;
+            if (tooMany || exceptionSeen)
+                {
+                    try
+                    {
+                        return deadLetterQueue.process(event);
+                    }
+                    catch (Exception ex)
+                    {
+                        logger.info("Exception thrown from failed message processing for message " + messageId, ex);
+                    }
+                    return null;
+                }
+        
+                try
+                {
+                    MuleEvent returnEvent = processNext(event);
+                    counter = findCounter(messageId);
+                    if (counter != null)
+                    {
+                        resetCounter(messageId);
+                    }
+                    return returnEvent;
+                }
+                catch (MuleException ex)
+                {
+                    incrementCounter(messageId);
+                    throw ex;
+                }
+                catch (RuntimeException ex)
+                {
+                    incrementCounter(messageId);
+                    throw ex;
+                }
         }
-        catch (MuleException ex)
+        finally 
         {
-            incrementCounter(messageId);
-            throw ex;
+            store.releaseEntry(messageId);
         }
-        catch (RuntimeException ex)
-        {
-            incrementCounter(messageId);
-            throw ex;
-        }
+        
     }
 
     private void resetCounter(String messageId) throws ObjectStoreException

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/store/DefaultLockableObjectStore.java (0 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/store/DefaultLockableObjectStore.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/store/DefaultLockableObjectStore.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -0,0 +1,70 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.store;
+
+import org.mule.api.store.LockableObjectStore;
+import org.mule.api.store.ObjectStore;
+import org.mule.api.store.ObjectStoreException;
+
+import java.io.Serializable;
+
+public class DefaultLockableObjectStore<T extends Serializable> implements LockableObjectStore<T>
+{
+    private ObjectStore<T> objectStore;
+    private MuleEntryLocker entryLocker;
+
+    public DefaultLockableObjectStore(ObjectStore<T> objectStore, MuleEntryLocker muleEntryLocker)
+    {
+        this.objectStore = objectStore;
+        this.entryLocker = muleEntryLocker;
+    }
+
+    @Override
+    public void lockEntry(Serializable key)
+    {
+        this.entryLocker.lock(key);
+    }
+
+    @Override
+    public void releaseEntry(Serializable key)
+    {
+        this.entryLocker.release(key);
+    }
+
+    @Override
+    public boolean contains(Serializable key) throws ObjectStoreException
+    {
+        return objectStore.contains(key);
+    }
+
+    @Override
+    public void store(Serializable key, T value) throws ObjectStoreException
+    {
+        objectStore.store(key,value);
+    }
+
+    @Override
+    public T retrieve(Serializable key) throws ObjectStoreException
+    {
+        return objectStore.retrieve(key);
+    }
+
+    @Override
+    public T remove(Serializable key) throws ObjectStoreException
+    {
+        return objectStore.remove(key);
+    }
+
+    @Override
+    public boolean isPersistent()
+    {
+        return objectStore.isPersistent();
+    }
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/store/DefaultLockableObjectStore.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleEntryLocker.java (0 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleEntryLocker.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleEntryLocker.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -0,0 +1,29 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.store;
+
+/**
+ *
+ * Provides a lock mechanism for share information in Mule.
+ *
+ */
+public interface MuleEntryLocker<T>
+{
+    /**
+     * Creates a lock around a lockIdentifier.
+     * To release lock use release method with the same identifier
+     */
+    public void lock(T lockIdentifier);
+
+    /**
+     *  Releases a lock previously locked.
+     */
+    public void release(T lockIdentifier);
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleEntryLocker.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleObjectStoreManager.java (24765 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleObjectStoreManager.java	2012-08-17 18:26:46 UTC (rev 24765)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleObjectStoreManager.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -10,6 +10,8 @@
 
 package org.mule.util.store;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.mule.api.MuleContext;
 import org.mule.api.MuleRuntimeException;
 import org.mule.api.config.MuleProperties;
@@ -18,6 +20,7 @@
 import org.mule.api.lifecycle.Initialisable;
 import org.mule.api.lifecycle.InitialisationException;
 import org.mule.api.store.ListableObjectStore;
+import org.mule.api.store.LockableObjectStore;
 import org.mule.api.store.ObjectStore;
 import org.mule.api.store.ObjectStoreException;
 import org.mule.api.store.ObjectStoreManager;
@@ -32,9 +35,6 @@
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 public class MuleObjectStoreManager
     implements ObjectStoreManager, MuleContextAware, Initialisable, Disposable
 {
@@ -259,4 +259,20 @@
             }
         }
     }
+
+    @Override
+    public <T extends LockableObjectStore<? extends Serializable>> T getLockableObjectStore(ObjectStore<? extends Serializable> objectStore)
+    {
+        LockableObjectStore lockableObjectStore;
+        if (objectStore instanceof LockableObjectStore)
+        {
+            lockableObjectStore = (LockableObjectStore) objectStore;
+        }
+        else
+        {
+            MuleEntryLocker muleEntryLocker = muleContext.getRegistry().get(MuleProperties.OBJECT_STORE_DEFAULT_LOCKER);
+            lockableObjectStore = new DefaultLockableObjectStore(objectStore, muleEntryLocker);
+        }
+        return (T) lockableObjectStore;
+    }
 }

Added: branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleServerEntryLocker.java (0 => 24766)


--- branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleServerEntryLocker.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleServerEntryLocker.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -0,0 +1,60 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.store;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class MuleServerEntryLocker implements MuleEntryLocker<Serializable>
+{
+    private Map<Serializable, ReentrantLock> locks;
+    private Object acquireLock = new Object();
+
+    public MuleServerEntryLocker()
+    {
+        this.locks = new HashMap<Serializable,ReentrantLock>();
+    }
+
+    public void lock(Serializable key)
+    {
+        ReentrantLock lock;
+        synchronized (acquireLock)
+        {
+            if (this.locks.containsKey(key))
+            {
+                lock = this.locks.get(key);
+            }
+            else
+            {
+                lock = new ReentrantLock(true);
+                this.locks.put(key,lock);
+            }
+        }
+        lock.lock();
+    }
+
+    public void release(Serializable key)
+    {
+        synchronized (acquireLock)
+        {
+            ReentrantLock reentrantLock = this.locks.get(key);
+            if (reentrantLock != null)
+            {
+                if (!reentrantLock.hasQueuedThreads())
+                {
+                    this.locks.remove(key);
+                }
+                reentrantLock.unlock();
+            }
+        }
+    }
+}
Property changes on: branches/mule-3.2.x/core/src/main/java/org/mule/util/store/MuleServerEntryLocker.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Modified: branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java (24765 => 24766)


--- branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java	2012-08-17 18:26:46 UTC (rev 24765)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -10,71 +10,97 @@
 
 package org.mule.processor;
 
-import static org.mockito.Matchers.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-
+import junit.framework.Assert;
 import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
 import org.mockito.Answers;
-import org.mockito.Mockito;
 import org.mockito.internal.verification.VerificationModeFactory;
-import org.mule.api.*;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mule.api.MuleContext;
+import org.mule.api.MuleEvent;
+import org.mule.api.MuleException;
+import org.mule.api.MuleMessage;
 import org.mule.api.config.MuleProperties;
 import org.mule.api.construct.FlowConstruct;
 import org.mule.api.processor.MessageProcessor;
+import org.mule.api.store.LockableObjectStore;
 import org.mule.api.store.ObjectStore;
 import org.mule.api.store.ObjectStoreException;
 import org.mule.api.store.ObjectStoreManager;
-import org.mule.routing.MessageProcessorFilterPair;
 import org.mule.tck.junit4.AbstractMuleTestCase;
-
-import org.junit.Test;
-
-import junit.framework.Assert;
+import org.mule.tck.junit4.rule.SystemProperty;
 import org.mule.util.SerializationUtils;
+import org.mule.util.concurrent.Latch;
+import org.mule.util.store.MuleServerEntryLocker;
 
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class IdempotentRedeliveryPolicyTestCase extends AbstractMuleTestCase
 {
 
     public static final String STRING_MESSAGE = "message";
-    public static final int MAX_REDELIVERY_COUNT = 1;
+    public static final int MAX_REDELIVERY_COUNT = 0;
     private MuleContext mockMuleContext = mock(MuleContext.class, Answers.RETURNS_DEEP_STUBS.get());
     private ObjectStoreManager mockObjectStoreManager = mock(ObjectStoreManager.class, Answers.RETURNS_DEEP_STUBS.get());
     private MessageProcessor mockFailingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
-    private MessageProcessorFilterPair mockDlqMessageProcessor = mock(MessageProcessorFilterPair.class, Answers.RETURNS_DEEP_STUBS.get());
+    private MessageProcessor mockWaitingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
+    private MessageProcessor mockDlqMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
     private MuleMessage message = mock(MuleMessage.class, Answers.RETURNS_DEEP_STUBS.get());
     private MuleEvent event = mock(MuleEvent.class, Answers.RETURNS_DEEP_STUBS.get());
+    private Latch waitLatch = new Latch();
+    private CountDownLatch waitingMessageProcessorExecutionLatch = new CountDownLatch(2);
+    private final IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
 
+    @Rule
+    public SystemProperty systemProperty = new SystemProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+
     @Before
     public void setUpTest() throws MuleException
     {
         when(mockFailingMessageProcessor.process(any(MuleEvent.class))).thenThrow(new RuntimeException("failing"));
-        System.setProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+        when(mockWaitingMessageProcessor.process(event)).thenAnswer(new Answer<MuleEvent>()
+        {
+            @Override
+            public MuleEvent answer(InvocationOnMock invocationOnMock) throws Throwable
+            {
+                waitingMessageProcessorExecutionLatch.countDown();
+                waitLatch.await(2000, TimeUnit.MILLISECONDS);
+                return mockFailingMessageProcessor.process((MuleEvent) invocationOnMock.getArguments()[0]);
+            }
+        });
+        when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
+        InMemoryObjectStore inMemoryObjectStore = new InMemoryObjectStore();
+        when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(inMemoryObjectStore);
+        when(mockObjectStoreManager.getLockableObjectStore(inMemoryObjectStore)).thenReturn(inMemoryObjectStore);
+        when(event.getMessage()).thenReturn(message);
+        irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
+        irp.setUseSecureHash(true);
+        irp.setFlowConstruct(mock(FlowConstruct.class));
+        irp.setMuleContext(mockMuleContext);
+        irp.setListener(mockFailingMessageProcessor);
+        irp.setMessageProcessor(mockDlqMessageProcessor);
+
     }
 
     @Test
     public void messageDigestFailure() throws Exception
     {
-        Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
-        Mockito.when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(new InMemoryObjectStore());
-
-        IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
-        irp.setUseSecureHash(true);
-        irp.setMaxRedeliveryCount(1);
-        irp.setFlowConstruct(mock(FlowConstruct.class));
-        irp.setMuleContext(mockMuleContext);
+        when(message.getPayload()).thenReturn(new Object());
         irp.initialise();
-
-
-        when(message.getPayload()).thenReturn(new Object());
-
-        when(event.getMessage()).thenReturn(message);
         MuleEvent process = irp.process(event);
         Assert.assertNull(process);
     }
@@ -82,22 +108,43 @@
     @Test
     public void testMessageRedeliveryUsingMemory() throws Exception
     {
-        Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
-        Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new InMemoryObjectStore());
+        when(message.getPayload()).thenReturn(STRING_MESSAGE);
+        irp.initialise();
+        processUntilFailure();
+        verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+    }
 
-        IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
-        irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
-        irp.setUseSecureHash(true);
-        irp.setFlowConstruct(mock(FlowConstruct.class));
-        irp.setMuleContext(mockMuleContext);
-        irp.setListener(mockFailingMessageProcessor);
-        irp.setDeadLetterQueue(mockDlqMessageProcessor);
+    @Test
+    public void testMessageRedeliveryUsingSerializationStore() throws Exception
+    {
+        when(message.getPayload()).thenReturn(STRING_MESSAGE);
+        when(mockObjectStoreManager.getLockableObjectStore(any(ObjectStore.class))).thenReturn(new SerializationObjectStore());
         irp.initialise();
+        processUntilFailure();
+        verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+    }
 
+    @Test
+    public void testThreadSafeObjectStoreUsage() throws Exception
+    {
         when(message.getPayload()).thenReturn(STRING_MESSAGE);
-        when(event.getMessage()).thenReturn(message);
+        irp.setListener(mockWaitingMessageProcessor);
+        irp.initialise();
 
-        for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)
+        ExecuteIrpThread firstIrpExecutionThread = new ExecuteIrpThread();
+        firstIrpExecutionThread.start();
+        ExecuteIrpThread threadCausingRedeliveryException = new ExecuteIrpThread();
+        threadCausingRedeliveryException.start();
+        waitingMessageProcessorExecutionLatch.await(5000, TimeUnit.MILLISECONDS);
+        waitLatch.release();
+        firstIrpExecutionThread.join();
+        threadCausingRedeliveryException.join();
+        verify(mockDlqMessageProcessor, VerificationModeFactory.times(1)).process(event);
+    }
+
+    private void processUntilFailure()
+    {
+        for (int i = 0; i < MAX_REDELIVERY_COUNT + 2; i++)
         {
             try
             {
@@ -107,28 +154,14 @@
             {
             }
         }
-        verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1));
     }
 
-    @Test
-    public void testMessageRedeliveryUsingSerializationStore() throws Exception
+    public class ExecuteIrpThread extends Thread
     {
-        Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
-        Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new SerializationObjectStore());
-
-        IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
-        irp.setUseSecureHash(true);
-        irp.setMaxRedeliveryCount(1);
-        irp.setFlowConstruct(mock(FlowConstruct.class));
-        irp.setMuleContext(mockMuleContext);
-        irp.setListener(mockFailingMessageProcessor);
-        irp.setDeadLetterQueue(mockDlqMessageProcessor);
-        irp.initialise();
-
-        when(message.getPayload()).thenReturn(STRING_MESSAGE);
-        when(event.getMessage()).thenReturn(message);
-
-        for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)
+        public Exception exception;
+        
+        @Override
+        public void run()
         {
             try
             {
@@ -136,15 +169,18 @@
             }
             catch (Exception e)
             {
+                exception = e;
             }
         }
-        verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1));
     }
+   
+    
 
-    public static class SerializationObjectStore implements ObjectStore<AtomicInteger>
+    public static class SerializationObjectStore implements LockableObjectStore<AtomicInteger>
     {
 
         private Map<Serializable,Serializable> store = new HashMap<Serializable,Serializable>();
+        private MuleServerEntryLocker lockableObjectStore = new MuleServerEntryLocker();
 
         @Override
         public boolean contains(Serializable key) throws ObjectStoreException
@@ -177,11 +213,24 @@
         {
             return false;
         }
+
+        @Override
+        public void lockEntry(Serializable key)
+        {
+            lockableObjectStore.lock(key);
+        }
+
+        @Override
+        public void releaseEntry(Serializable key)
+        {
+            lockableObjectStore.release(key);
+        }
     }
 
-    public static class InMemoryObjectStore implements ObjectStore<AtomicInteger>
+    public static class InMemoryObjectStore implements LockableObjectStore<AtomicInteger>
     {
         private Map<Serializable,AtomicInteger> store = new HashMap<Serializable,AtomicInteger>();
+        private MuleServerEntryLocker lockableObjectStore = new MuleServerEntryLocker();
 
         @Override
         public boolean contains(Serializable key) throws ObjectStoreException
@@ -212,6 +261,18 @@
         {
             return false;
         }
+
+        @Override
+        public void lockEntry(Serializable key)
+        {
+            lockableObjectStore.lock(key);
+        }
+
+        @Override
+        public void releaseEntry(Serializable key)
+        {
+            lockableObjectStore.release(key);
+        }
     }
 
 }

Added: branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java (0 => 24766)


--- branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -0,0 +1,90 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.tck.junit4.rule;
+
+import org.junit.rules.ExternalResource;
+
+/**
+ * Sets up a system property before a test and guaranties to tear it down
+ * afterward.
+ */
+public class SystemProperty extends ExternalResource
+{
+
+    private final String name;
+    private String value;
+    private boolean initialized;
+    private String oldValue;
+
+    public SystemProperty(String name)
+    {
+        this(name, null);
+    }
+
+    public SystemProperty(String name, String value)
+    {
+        this.name = name;
+        this.value = value;
+    }
+
+    @Override
+    protected void before() throws Throwable
+    {
+        if (initialized)
+        {
+            throw new IllegalArgumentException("System property was already initialized");
+        }
+
+        oldValue = System.setProperty(name, getValue());
+        initialized = true;
+    }
+
+    @Override
+    protected void after()
+    {
+        if (!initialized)
+        {
+            throw new IllegalArgumentException("System property was not initialized");
+        }
+
+        doCleanUp();
+        restoreOldValue();
+
+        initialized = false;
+    }
+
+    protected void restoreOldValue()
+    {
+        if (oldValue == null)
+        {
+            System.clearProperty(name);
+        }
+        else
+        {
+            System.setProperty(name, oldValue);
+        }
+    }
+
+    public String getName()
+    {
+        return name;
+    }
+
+    protected void doCleanUp()
+    {
+        // Nothing to do
+    };
+
+    public String getValue()
+    {
+        return value;
+    };
+}
Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/tck/junit4/rule/SystemProperty.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.2.x/core/src/test/java/org/mule/util/store/MuleServerEntryLockerTestCase.java (0 => 24766)


--- branches/mule-3.2.x/core/src/test/java/org/mule/util/store/MuleServerEntryLockerTestCase.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/util/store/MuleServerEntryLockerTestCase.java	2012-08-17 19:04:00 UTC (rev 24766)
@@ -0,0 +1,148 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.util.store;
+
+import org.junit.Test;
+import org.mule.api.store.ObjectAlreadyExistsException;
+import org.mule.api.store.ObjectStore;
+import org.mule.api.store.ObjectStoreException;
+import org.mule.config.i18n.CoreMessages;
+import org.mule.util.concurrent.Latch;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class MuleServerEntryLockerTestCase
+{
+    public static final int THREAD_COUNT = 100;
+    public static final int ITERATIONS_PER_THREAD = 1000;
+    private String sharedKeyA = "A";
+    private String sharedKeyB = "B";
+    private MuleServerEntryLocker lockableObjectStore = new MuleServerEntryLocker();
+    private InMemoryObjectStore objectStore  = new InMemoryObjectStore();
+    private Latch threadStartLatch = new Latch();
+
+
+    
+    @Test
+    public void testHighConcurrency() throws Exception
+    {
+        List<Thread> threads = new ArrayList<Thread>(THREAD_COUNT);
+        for (int i = 0; i < THREAD_COUNT; i++)
+        {
+            IncrementKeyValueThread incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyA);
+            threads.add(incrementKeyValueThread);
+            incrementKeyValueThread.start();
+            incrementKeyValueThread = new IncrementKeyValueThread(sharedKeyB);
+            threads.add(incrementKeyValueThread);
+            incrementKeyValueThread.start();
+        }
+        threadStartLatch.release();
+        for (Thread thread : threads)
+        {
+            thread.join();
+        }
+        assertThat(objectStore.retrieve(sharedKeyA), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+        assertThat(objectStore.retrieve(sharedKeyB), is(THREAD_COUNT * ITERATIONS_PER_THREAD));
+    }
+    
+    public class IncrementKeyValueThread extends Thread
+    {
+        private String key;
+
+        public IncrementKeyValueThread(String key)
+        {
+            super("Thread-" + key);
+            this.key = key;
+        }
+
+        @Override
+        public void run()
+        {
+            try
+            {
+                threadStartLatch.await(5000, TimeUnit.MILLISECONDS);
+                for (int i = 0; i < ITERATIONS_PER_THREAD; i ++)
+                {
+                    try
+                    {
+                        lockableObjectStore.lock(key);
+                        Integer value;
+                        if (objectStore.contains(key))
+                        {
+                            value = objectStore.retrieve(key);
+                            objectStore.remove(key);
+                        }
+                        else
+                        {
+                            value = 0;
+                        }
+                        objectStore.store(key,value + 1);
+                    }
+                    finally
+                    {
+                        lockableObjectStore.release(key);
+                    }
+                }
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public static class InMemoryObjectStore implements ObjectStore<Integer>
+    {
+        private Map<Serializable,Integer> store = new HashMap<Serializable,Integer>();
+
+        @Override
+        public boolean contains(Serializable key) throws ObjectStoreException
+        {
+            return store.containsKey(key);
+        }
+
+        @Override
+        public void store(Serializable key, Integer value) throws ObjectStoreException
+        {
+            if (store.containsKey(key))
+            {
+                throw new ObjectAlreadyExistsException(CoreMessages.createStaticMessage(""));
+            }
+            store.put(key,value);
+        }
+
+        @Override
+        public Integer retrieve(Serializable key) throws ObjectStoreException
+        {
+            return store.get(key);
+        }
+
+        @Override
+        public Integer remove(Serializable key) throws ObjectStoreException
+        {
+            return store.remove(key);
+        }
+
+        @Override
+        public boolean isPersistent()
+        {
+            return false;
+        }
+    }
+
+}
Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/util/store/MuleServerEntryLockerTestCase.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style


To unsubscribe from this list please visit:

http://xircles.codehaus.org/manage_email