Re: [mule-scm] [mule][25119] branches/mule-3.x/transports/http/src: MULE-6562 - improving http socket handling.

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Re: [mule-scm] [mule][25119] branches/mule-3.x/transports/http/src: MULE-6562 - improving http socket handling.

Daniel Feist
Hi,

Where can I find doc/spec explaining these changes and how things now work with these changes.  This doc is important for both mule developers and advanced users, and ideally not just something that just gets implemented on the fly.

Dan

On Dec 10, 2012, at 6:20 PM, [hidden email] wrote:

[mule][25119] branches/mule-3.x/transports/http/src: MULE-6562 - improving http socket handling.
Revision
25119
Author
pablo.lagreca
Date
2012-12-10 15:20:57 -0600 (Mon, 10 Dec 2012)

Log Message

MULE-6562 - improving http socket handling. SocketServer connections are kept by HTTP connector and it serves each HTTP request to the appropiate HttpMessageReceiver after reading the RequestLine.

Modified Paths

  • <a href="x-msg://2221/#branchesmule3xtransportshttpsrcmainjavaorgmuletransporthttpHttpConnectorjava">branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpConnector.java
  • <a href="x-msg://2221/#branchesmule3xtransportshttpsrcmainjavaorgmuletransporthttpHttpMessageReceiverjava">branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpMessageReceiver.java
  • <a href="x-msg://2221/#branchesmule3xtransportshttpsrcmainjavaorgmuletransporthttpHttpRequestjava">branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpRequest.java
  • <a href="x-msg://2221/#branchesmule3xtransportshttpsrcmainjavaorgmuletransporthttpHttpServerConnectionjava">branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpServerConnection.java
  • <a href="x-msg://2221/#branchesmule3xtransportshttpsrcmainjavaorgmuletransporthttpHttpsMessageReceiverjava">branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpsMessageReceiver.java
  • <a href="x-msg://2221/#branchesmule3xtransportshttpsrcmainjavaorgmuletransporthttpRequestLinejava">branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/RequestLine.java
  • <a href="x-msg://2221/#branchesmule3xtransportshttpsrctestjavaorgmuletransporthttpHttpConnectorTestCasejava">branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpConnectorTestCase.java
  • <a href="x-msg://2221/#branchesmule3xtransportshttpsrctestjavaorgmuletransporthttpfunctionalHttpsHandshakeTimingTestCasejava">branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/functional/HttpsHandshakeTimingTestCase.java

Added Paths

  • <a href="x-msg://2221/#branchesmule3xtransportshttpsrcmainjavaorgmuletransporthttpHttpConnectionManagerjava">branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpConnectionManager.java
  • <a href="x-msg://2221/#branchesmule3xtransportshttpsrcmainjavaorgmuletransporthttpHttpRequestDispatcherjava">branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpRequestDispatcher.java
  • <a href="x-msg://2221/#branchesmule3xtransportshttpsrcmainjavaorgmuletransporthttpHttpRequestDispatcherWorkjava">branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpRequestDispatcherWork.java
  • <a href="x-msg://2221/#branchesmule3xtransportshttpsrctestjavaorgmuletransporthttpHttpConnectionManagerTestCasejava">branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpConnectionManagerTestCase.java
  • <a href="x-msg://2221/#branchesmule3xtransportshttpsrctestjavaorgmuletransporthttpHttpRequestDispatcherTestCasejava">branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpRequestDispatcherTestCase.java
  • <a href="x-msg://2221/#branchesmule3xtransportshttpsrctestjavaorgmuletransporthttpHttpRequestDispatcherWorkTestCasejava">branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpRequestDispatcherWorkTestCase.java
  • <a href="x-msg://2221/#branchesmule3xtransportshttpsrctestjavaorgmuletransporthttpHttpServerConnectionTestCasejava">branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpServerConnectionTestCase.java
  • <a href="x-msg://2221/#branchesmule3xtransportshttpsrctestjavaorgmuletransporthttpRequestLineTestCasejava">branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/RequestLineTestCase.java

Diff

Added: branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpConnectionManager.java (0 => 25119)


--- branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpConnectionManager.java	                        (rev 0)
+++ branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpConnectionManager.java	2012-12-10 21:20:57 UTC (rev 25119)
@@ -0,0 +1,104 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.transport.http;
+
+import org.mule.api.context.WorkManager;
+import org.mule.api.endpoint.EndpointURI;
+import org.mule.transport.ConnectException;
+
+import java.net.ServerSocket;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Manager {@link HttpRequestDispatcher} connections and disconnections to {@link EndpointURI}.
+ * <p/>
+ * Starts listening for HTTP request when at least one endpoint is associated to a given port are connected and
+ * stops listening for HTTP request when all endpoints associated to a given port are disconnected.
+ */
+class HttpConnectionManager
+{
+
+    private static final int LAST_CONNECTION = 1;
+    protected final Log logger = LogFactory.getLog(getClass());
+
+    final private HttpConnector connector;
+    final private Map<String, HttpRequestDispatcher> socketDispatchers = new HashMap<String, HttpRequestDispatcher>();
+    final private Map<String, Integer> socketDispatcherCount = new HashMap<String, Integer>();
+    final private WorkManager workManager;
+
+    public HttpConnectionManager(HttpConnector connector, WorkManager workManager)
+    {
+        if (connector == null)
+        {
+            throw new IllegalArgumentException("HttpConnector can not be null");
+        }
+        if (workManager == null)
+        {
+            throw new IllegalArgumentException("WorkManager can not be null");
+        }
+        this.connector = connector;
+        this.workManager = workManager;
+    }
+
+    synchronized void addConnection(final EndpointURI endpointURI) throws ConnectException
+    {
+        try
+        {
+            String endpointKey = getKeyForEndpointUri(endpointURI);
+            if (socketDispatchers.containsKey(endpointKey))
+            {
+                socketDispatcherCount.put(endpointKey, socketDispatcherCount.get(endpointKey) + 1);
+            }
+            else
+            {
+                ServerSocket serverSocket = connector.getServerSocket(endpointURI.getUri());
+                HttpRequestDispatcher httpRequestDispatcher = new HttpRequestDispatcher(connector, connector.getRetryPolicyTemplate(), serverSocket, workManager);
+                socketDispatchers.put(endpointKey, httpRequestDispatcher);
+                socketDispatcherCount.put(endpointKey, new Integer(1));
+                workManager.scheduleWork(httpRequestDispatcher, WorkManager.INDEFINITE, null, connector);
+            }
+        }
+        catch (Exception e)
+        {
+            throw new ConnectException(e, connector);
+        }
+    }
+
+    synchronized void removeConnection(final EndpointURI endpointURI)
+    {
+        String endpointKey = getKeyForEndpointUri(endpointURI);
+        if (!socketDispatchers.containsKey(endpointKey))
+        {
+            logger.warn("Trying to disconnect endpoint with uri " + endpointKey + " but " + HttpRequestDispatcher.class.getName() + " does not exists for that uri");
+            return;
+        }
+        Integer connectionsRequested = socketDispatcherCount.get(endpointKey);
+        if (connectionsRequested == LAST_CONNECTION)
+        {
+            HttpRequestDispatcher httpRequestDispatcher = socketDispatchers.get(endpointKey);
+            httpRequestDispatcher.disconnect();
+            socketDispatchers.remove(endpointKey);
+            socketDispatcherCount.remove(endpointKey);
+        }
+        else
+        {
+            socketDispatcherCount.put(endpointKey, socketDispatcherCount.get(endpointKey) - 1);
+        }
+    }
+
+    private String getKeyForEndpointUri(final EndpointURI endpointURI)
+    {
+        return endpointURI.getHost() + ":" + endpointURI.getPort();
+    }
+}
Property changes on: branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpConnectionManager.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Modified: branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpConnector.java (25118 => 25119)


--- branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpConnector.java	2012-12-10 19:56:53 UTC (rev 25118)
+++ branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpConnector.java	2012-12-10 21:20:57 UTC (rev 25119)
@@ -12,17 +12,27 @@
 
 import org.mule.api.MuleContext;
 import org.mule.api.MuleEvent;
+import org.mule.api.MuleException;
 import org.mule.api.MuleMessage;
 import org.mule.api.construct.FlowConstruct;
+import org.mule.api.endpoint.EndpointURI;
 import org.mule.api.endpoint.ImmutableEndpoint;
 import org.mule.api.endpoint.InboundEndpoint;
 import org.mule.api.lifecycle.InitialisationException;
 import org.mule.api.processor.MessageProcessor;
+import org.mule.api.transport.MessageReceiver;
 import org.mule.config.i18n.CoreMessages;
+import org.mule.transport.ConnectException;
 import org.mule.transport.http.ntlm.NTLMScheme;
 import org.mule.transport.tcp.TcpConnector;
+import org.mule.util.MapUtils;
 
+import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.URI;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -58,7 +68,6 @@
  * <li>proxyUsername - If the proxy requires authentication supply a username</li>
  * <li>proxyPassword - If the proxy requires authentication supply a password</li>
  * </ul>
- * 
  */
 
 public class HttpConnector extends TcpConnector
@@ -66,13 +75,13 @@
 
     public static final String HTTP = "http";
     public static final String HTTP_PREFIX = "http.";
-    
+
     /**
      * MuleEvent property to pass back the status for the response
      */
     public static final String HTTP_STATUS_PROPERTY = HTTP_PREFIX + "status";
     public static final String HTTP_VERSION_PROPERTY = HTTP_PREFIX + "version";
-    
+
     /**
      * @deprecated Instead users can now add properties to the outgoing request using the OUTBOUND property scope on the message.
      */
@@ -93,19 +102,19 @@
     public static final String HTTP_QUERY_STRING = HTTP_PREFIX + "query.string";
 
     public static final String HTTP_METHOD_PROPERTY = HTTP_PREFIX + "method";
-    
+
     /**
-     * The path and query portions of the URL being accessed. 
+     * The path and query portions of the URL being accessed.
      */
     public static final String HTTP_REQUEST_PROPERTY = HTTP_PREFIX + "request";
-    
+
     /**
      * The path portion of the URL being accessed. No query string is included.
      */
     public static final String HTTP_REQUEST_PATH_PROPERTY = HTTP_PREFIX + "request.path";
-    
+
     /**
-     * The context path of the endpoint being accessed. This is the path that the 
+     * The context path of the endpoint being accessed. This is the path that the
      * HTTP endpoint is listening on.
      */
     public static final String HTTP_CONTEXT_PATH_PROPERTY = HTTP_PREFIX + "context.path";
@@ -136,10 +145,10 @@
 
     public static final String HTTP_DISABLE_STATUS_CODE_EXCEPTION_CHECK = HTTP_PREFIX + "disable.status.code.exception.check";
     public static final String HTTP_ENCODE_PARAMVALUE = HTTP_PREFIX + "encode.paramvalue";
-    
+
     public static final Set<String> HTTP_INBOUND_PROPERTIES;
-    
-    static 
+
+    static
     {
         Set<String> props = new HashSet<String>();
         props.add(HTTP_CONTEXT_PATH_PROPERTY);
@@ -156,13 +165,14 @@
 
         AuthPolicy.registerAuthScheme(AuthPolicy.NTLM, NTLMScheme.class);
     }
-    
+
     public static final String HTTP_COOKIE_SPEC_PROPERTY = "cookieSpec";
     public static final String HTTP_COOKIES_PROPERTY = "cookies";
     public static final String HTTP_ENABLE_COOKIES_PROPERTY = "enableCookies";
 
     public static final String COOKIE_SPEC_NETSCAPE = "netscape";
     public static final String COOKIE_SPEC_RFC2109 = "rfc2109";
+    public static final String ROOT_PATH = "/";
 
     private String proxyHostname = null;
 
@@ -184,11 +194,13 @@
 
     private boolean disableCleanupThread;
 
+    private org.mule.transport.http.HttpConnectionManager connectionManager;
+
     public HttpConnector(MuleContext context)
     {
         super(context);
     }
-    
+
     @Override
     protected void doInitialise() throws InitialisationException
     {
@@ -251,7 +263,7 @@
             {
                 // normalize properties for HTTP
                 Map newProperties = new HashMap(endpointProperties.size());
-                for (Iterator entries = endpointProperties.entrySet().iterator(); entries.hasNext();)
+                for (Iterator entries = endpointProperties.entrySet().iterator(); entries.hasNext(); )
                 {
                     Map.Entry entry = (Map.Entry) entries.next();
                     Object key = entry.getKey();
@@ -355,7 +367,7 @@
         if (!(COOKIE_SPEC_NETSCAPE.equalsIgnoreCase(cookieSpec) || COOKIE_SPEC_RFC2109.equalsIgnoreCase(cookieSpec)))
         {
             throw new IllegalArgumentException(
-                CoreMessages.propertyHasInvalidValue("cookieSpec", cookieSpec).toString());
+                    CoreMessages.propertyHasInvalidValue("cookieSpec", cookieSpec).toString());
         }
         this.cookieSpec = cookieSpec;
     }
@@ -424,23 +436,23 @@
             String authScopeRealm = msg.getOutboundProperty(HTTP_PREFIX + "auth.scope.realm", AuthScope.ANY_REALM);
             String authScopeScheme = msg.getOutboundProperty(HTTP_PREFIX + "auth.scope.scheme", AuthScope.ANY_SCHEME);
             client.getState().setCredentials(
-                new AuthScope(authScopeHost, authScopePort, authScopeRealm, authScopeScheme),
-                new UsernamePasswordCredentials(event.getCredentials().getUsername(), new String(
-                    event.getCredentials().getPassword())));
+                    new AuthScope(authScopeHost, authScopePort, authScopeRealm, authScopeScheme),
+                    new UsernamePasswordCredentials(event.getCredentials().getUsername(), new String(
+                            event.getCredentials().getPassword())));
         }
         else if (endpoint.getEndpointURI().getUserInfo() != null
-            && endpoint.getProperty(HttpConstants.HEADER_AUTHORIZATION) == null)
+                 && endpoint.getProperty(HttpConstants.HEADER_AUTHORIZATION) == null)
         {
             // Add User Creds
             StringBuffer header = new StringBuffer(128);
             header.append("Basic ");
             header.append(new String(Base64.encodeBase64(endpoint.getEndpointURI().getUserInfo().getBytes(
-                endpoint.getEncoding()))));
+                    endpoint.getEncoding()))));
             httpMethod.addRequestHeader(HttpConstants.HEADER_AUTHORIZATION, header.toString());
         }
         //TODO MULE-4501 this sohuld be removed and handled only in the ObjectToHttpRequest transformer
-        else if (event!=null && event.getMessage().getOutboundProperty(HttpConstants.HEADER_AUTHORIZATION) != null &&
-                httpMethod.getRequestHeader(HttpConstants.HEADER_AUTHORIZATION)==null)
+        else if (event != null && event.getMessage().getOutboundProperty(HttpConstants.HEADER_AUTHORIZATION) != null &&
+                 httpMethod.getRequestHeader(HttpConstants.HEADER_AUTHORIZATION) == null)
         {
             String auth = event.getMessage().getOutboundProperty(HttpConstants.HEADER_AUTHORIZATION);
             httpMethod.addRequestHeader(HttpConstants.HEADER_AUTHORIZATION, auth);
@@ -457,11 +469,11 @@
      */
     public static String normalizeUrl(String url)
     {
-        if (url == null) 
+        if (url == null)
         {
             url = "/";
-        } 
-        else if (!url.startsWith("/")) 
+        }
+        else if (!url.startsWith("/"))
         {
             url = "/" + url;
         }
@@ -477,4 +489,93 @@
     {
         this.proxyNtlmAuthentication = proxyNtlmAuthentication;
     }
+
+    public void connect(EndpointURI endpointURI) throws ConnectException
+    {
+        connectionManager.addConnection(endpointURI);
+    }
+
+    public void disconnect(EndpointURI endpointURI)
+    {
+        connectionManager.removeConnection(endpointURI);
+    }
+
+    public HttpMessageReceiver lookupReceiver(Socket socket, HttpRequest request)
+    {
+        String requestUriWithoutParams = request.getUrlWithoutParams();
+
+        StringBuilder requestUri = new StringBuilder(80);
+        if (requestUriWithoutParams.indexOf("://") == -1)
+        {
+            String hostName = ((InetSocketAddress) socket.getLocalSocketAddress()).getHostName();
+            int port = ((InetSocketAddress) socket.getLocalSocketAddress()).getPort();
+            requestUri.append(getProtocol()).append("://").append(hostName).append(':').append(port);
+            if (!ROOT_PATH.equals(requestUriWithoutParams))
+            {
+                requestUri.append(requestUriWithoutParams);
+            }
+        }
+
+        String uriStr = requestUri.toString();
+        // first check that there is a receiver on the root address
+        if (logger.isTraceEnabled())
+        {
+            logger.trace("Looking up receiver on connector: " + getName() + " with URI key: "
+                         + requestUri.toString());
+        }
+
+        HttpMessageReceiver receiver = (HttpMessageReceiver) lookupReceiver(uriStr);
+
+        // If no receiver on the root and there is a request path, look up the
+        // received based on the root plus request path
+        if (receiver == null && !ROOT_PATH.equals(requestUriWithoutParams))
+        {
+            if (logger.isDebugEnabled())
+            {
+                logger.debug("Secondary lookup of receiver on connector: " + getName()
+                             + " with URI key: " + requestUri.toString());
+            }
+
+            receiver = (HttpMessageReceiver) findReceiverByStem(getReceivers(), uriStr);
+
+            if (receiver == null && logger.isWarnEnabled())
+            {
+                logger.warn("No receiver found with secondary lookup on connector: " + getName()
+                            + " with URI key: " + requestUri.toString());
+                logger.warn("Receivers on connector are: "
+                            + MapUtils.toString(getReceivers(), true));
+            }
+        }
+        return receiver;
+    }
+
+    public static MessageReceiver findReceiverByStem(Map<Object, MessageReceiver> receivers, String uriStr)
+    {
+        int match = 0;
+        MessageReceiver receiver = null;
+        for (Map.Entry<Object, MessageReceiver> e : receivers.entrySet())
+        {
+            String key = (String) e.getKey();
+            MessageReceiver candidate = e.getValue();
+            if (uriStr.startsWith(key) && match < key.length())
+            {
+                match = key.length();
+                receiver = candidate;
+            }
+        }
+        return receiver;
+    }
+
+    @Override
+    protected ServerSocket getServerSocket(URI uri) throws IOException
+    {
+        return super.getServerSocket(uri);
+    }
+
+    @Override
+    protected void doStart() throws MuleException
+    {
+        super.doStart();
+        connectionManager = new org.mule.transport.http.HttpConnectionManager(this, getReceiverWorkManager());
+    }
 }

Modified: branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpMessageReceiver.java (25118 => 25119)


--- branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpMessageReceiver.java	2012-12-10 19:56:53 UTC (rev 25118)
+++ branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpMessageReceiver.java	2012-12-10 21:20:57 UTC (rev 25119)
@@ -21,32 +21,24 @@
 import org.mule.api.MuleMessage;
 import org.mule.api.config.MuleProperties;
 import org.mule.api.construct.FlowConstruct;
-import org.mule.api.endpoint.EndpointURI;
 import org.mule.api.endpoint.ImmutableEndpoint;
 import org.mule.api.endpoint.InboundEndpoint;
+import org.mule.api.execution.ExecutionCallback;
+import org.mule.api.execution.ExecutionTemplate;
 import org.mule.api.lifecycle.CreateException;
 import org.mule.api.lifecycle.InitialisationException;
 import org.mule.api.transport.Connector;
-import org.mule.api.transport.MessageReceiver;
 import org.mule.api.transport.PropertyScope;
 import org.mule.config.ExceptionHelper;
 import org.mule.config.i18n.Message;
 import org.mule.config.i18n.MessageFactory;
-import org.mule.api.execution.ExecutionCallback;
-import org.mule.api.execution.ExecutionTemplate;
+import org.mule.transport.AbstractMessageReceiver;
 import org.mule.transport.ConnectException;
 import org.mule.transport.NullPayload;
 import org.mule.transport.http.i18n.HttpMessages;
-import org.mule.transport.tcp.TcpConnector;
-import org.mule.transport.tcp.TcpMessageReceiver;
 import org.mule.util.MapUtils;
-import org.mule.util.monitor.Expirable;
 
 import java.io.IOException;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
 import javax.resource.spi.work.Work;
 
@@ -57,8 +49,9 @@
  * <code>HttpMessageReceiver</code> is a simple http server that can be used to
  * listen for HTTP requests on a particular port.
  */
-public class HttpMessageReceiver extends TcpMessageReceiver
+public class HttpMessageReceiver extends AbstractMessageReceiver
 {
+
     public HttpMessageReceiver(Connector connector, FlowConstruct flowConstruct, InboundEndpoint endpoint)
             throws CreateException
     {
@@ -66,187 +59,137 @@
     }
 
     @Override
-    protected Work createWork(Socket socket) throws IOException
+    protected void doConnect() throws ConnectException
     {
-        return new HttpWorker(socket);
+        ((HttpConnector) connector).connect(endpoint.getEndpointURI());
     }
 
     @Override
-    protected void doConnect() throws ConnectException
+    protected void doDisconnect() throws Exception
     {
-        // If we already have an endpoint listening on this socket don't try and
-        // start another serversocket
-        if (this.shouldConnect())
-        {
-            super.doConnect();
-        }
+        ((HttpConnector) connector).disconnect(endpoint.getEndpointURI());
     }
 
-    protected boolean shouldConnect()
+    public Work createWork(HttpServerConnection httpServerConnection) throws IOException
     {
-        StringBuffer requestUri = new StringBuffer(80);
-        requestUri.append(endpoint.getProtocol()).append("://");
-        requestUri.append(endpoint.getEndpointURI().getHost());
-        requestUri.append(':').append(endpoint.getEndpointURI().getPort());
-        requestUri.append('*');
-
-        MessageReceiver[] receivers = connector.getReceivers(requestUri.toString());
-        for (MessageReceiver receiver : receivers)
-        {
-            if (receiver.isConnected())
-            {
-                return false;
-            }
-        }
-
-        return true;
+        return new HttpWorker(httpServerConnection);
     }
 
     @SuppressWarnings("synthetic-access")
-    protected class HttpWorker implements Work, Expirable
+    protected class HttpWorker implements Work
     {
+
         private HttpServerConnection conn;
         private String remoteClientAddress;
 
-        public HttpWorker(Socket socket) throws IOException
+        public HttpWorker(HttpServerConnection httpServerConnection) throws IOException
         {
             String encoding = endpoint.getEncoding();
             if (encoding == null)
             {
                 encoding = connector.getMuleContext().getConfiguration().getDefaultEncoding();
             }
-
-            conn = new HttpServerConnection(socket, encoding, (HttpConnector) connector);
-
-            final SocketAddress clientAddress = socket.getRemoteSocketAddress();
-            if (clientAddress != null)
-            {
-                remoteClientAddress = clientAddress.toString();
-            }
+            conn = httpServerConnection;
+            remoteClientAddress = conn.getRemoteClientAddress();
         }
 
-        @Override
-        public void expired()
+        protected HttpServerConnection getServerConnection()
         {
-            if (conn.isOpen())
-            {
-                conn.close();
-            }
+            return conn;
         }
 
         @Override
         public void run()
         {
-            long keepAliveTimeout = ((TcpConnector) connector).getKeepAliveTimeout();
-
             try
             {
-                do
+                final HttpRequest request = conn.readRequest();
+                if (request == null)
                 {
-                    conn.setKeepAlive(false);
+                    throw new EmptyRequestException();
+                }
 
-                    // Only add a monitor if the timeout has been set
-                    if (keepAliveTimeout > 0)
+                try
+                {
+                    HttpResponse httpResponse = processRequest(request);
+                    conn.writeResponse(httpResponse);
+                }
+                catch (Exception e)
+                {
+                    MuleEvent response = null;
+                    if (e instanceof MessagingException)
                     {
-                        ((HttpConnector) connector).getKeepAliveMonitor().addExpirable(
-                            keepAliveTimeout, TimeUnit.MILLISECONDS, this);
+                        response = ((MessagingException) e).getEvent();
                     }
-
-                    final HttpRequest request = conn.readRequest();
-                    if (request == null)
+                    else
                     {
-                        break;
+                        getConnector().getMuleContext().getExceptionListener().handleException(e);
                     }
 
-                    try
+                    if (response != null &&
+                        response.getMessage().getExceptionPayload() != null &&
+                        response.getMessage().getExceptionPayload().getException() instanceof MessagingException)
                     {
-                        HttpResponse httpResponse = processRequest(request);
-                        conn.writeResponse(httpResponse);
+                        e = (Exception) response.getMessage().getExceptionPayload().getException();
                     }
-                    catch (Exception e)
-                    {
-                        MuleEvent response = null;
-                        if (e instanceof MessagingException)
-                        {
-                            response = ((MessagingException) e).getEvent();
-                        }
-                        else
-                        {
-                            getConnector().getMuleContext().getExceptionListener().handleException(e);
-                        }
+                    //MULE-5656 There was custom code here for mapping status codes to exceptions
+                    //I have removed this code and now make an explicit call to the Exception helper,
+                    //but the real fix is to make sure Mule handles this automatically through the
+                    //InboundExceptionDetailsMessageProcessor
 
-                        if (response != null &&
-                            response.getMessage().getExceptionPayload() != null &&
-                            response.getMessage().getExceptionPayload().getException() instanceof MessagingException)
-                        {
-                            e = (Exception) response.getMessage().getExceptionPayload().getException();
-                        }
-                        //MULE-5656 There was custom code here for mapping status codes to exceptions
-                        //I have removed this code and now make an explicit call to the Exception helper,
-                        //but the real fix is to make sure Mule handles this automatically through the
-                        //InboundExceptionDetailsMessageProcessor
+                    //Response code mappings are loaded from META-INF/services/org/mule/config/http-exception-mappings.properties
+                    String temp = ExceptionHelper.getErrorMapping(connector.getProtocol(), e.getClass(), flowConstruct.getMuleContext());
+                    int httpStatus = Integer.valueOf(temp);
 
-                        //Response code mappings are loaded from META-INF/services/org/mule/config/http-exception-mappings.properties
-                        String temp = ExceptionHelper.getErrorMapping(connector.getProtocol(), e.getClass(),flowConstruct.getMuleContext());
-                        int httpStatus = Integer.valueOf(temp);
-
-                        if (e instanceof MessagingException)
-                        {
-                            MuleEvent event = ((MessagingException) e).getEvent();
-                            httpStatus = event.getMessage().getOutboundProperty(HttpConnector.HTTP_STATUS_PROPERTY) != null ? Integer.valueOf(event.getMessage().getOutboundProperty(HttpConnector.HTTP_STATUS_PROPERTY).toString()) : httpStatus;
-                            conn.writeResponse(buildFailureResponse(event, e.getMessage(),httpStatus));
-                        }
-                        else
-                        {
-                            conn.writeResponse(buildFailureResponse(request.getRequestLine().getHttpVersion(), httpStatus, e.getMessage()));
-                        }
-                        break;
+                    if (e instanceof MessagingException)
+                    {
+                        MuleEvent event = ((MessagingException) e).getEvent();
+                        httpStatus = event.getMessage().getOutboundProperty(HttpConnector.HTTP_STATUS_PROPERTY) != null ? Integer.valueOf(event.getMessage().getOutboundProperty(HttpConnector.HTTP_STATUS_PROPERTY).toString()) : httpStatus;
+                        conn.writeResponse(buildFailureResponse(event, e.getMessage(), httpStatus));
                     }
-                    finally
+                    else
                     {
-                        // Ensure that we drop any monitors
-                        ((HttpConnector) connector).getKeepAliveMonitor().removeExpirable(this);
-
-                        if (request.getBody() != null)
-                        {
-                            request.getBody().close();
-                        }
+                        conn.writeFailureResponse(httpStatus, e.getMessage());
                     }
+                    throw new FailureProcessingRequestException();
                 }
-                while (conn.isKeepAlive());
+                finally
+                {
+                    if (request.getBody() != null)
+                    {
+                        request.getBody().close();
+                    }
+                }
             }
-            catch (Exception e)
+            catch (EmptyRequestException e)
             {
-                getConnector().getMuleContext().getExceptionListener().handleException(e);
+                throw e;
             }
-            finally
+            catch (FailureProcessingRequestException e)
             {
-                logger.debug("Closing HTTP connection.");
-
-                if (conn.isOpen())
-                {
-                    conn.close();
-                    conn = null;
-                }
+                throw e;
             }
+            catch (Exception e)
+            {
+                getConnector().getMuleContext().getExceptionListener().handleException(e);
+            }
         }
 
 
-
         protected HttpResponse processRequest(HttpRequest request) throws MuleException, IOException
         {
             RequestLine requestLine = request.getRequestLine();
             String method = requestLine.getMethod();
 
             if (method.equals(HttpConstants.METHOD_GET)
-                    || method.equals(HttpConstants.METHOD_HEAD)
-                    || method.equals(HttpConstants.METHOD_POST)
-                    || method.equals(HttpConstants.METHOD_OPTIONS)
-                    || method.equals(HttpConstants.METHOD_PUT)
-                    || method.equals(HttpConstants.METHOD_DELETE)
-                    || method.equals(HttpConstants.METHOD_TRACE)
-                    || method.equals(HttpConstants.METHOD_CONNECT)
-                    || method.equals(HttpConstants.METHOD_PATCH))
+                || method.equals(HttpConstants.METHOD_HEAD)
+                || method.equals(HttpConstants.METHOD_POST)
+                || method.equals(HttpConstants.METHOD_OPTIONS)
+                || method.equals(HttpConstants.METHOD_PUT)
+                || method.equals(HttpConstants.METHOD_DELETE)
+                || method.equals(HttpConstants.METHOD_TRACE)
+                || method.equals(HttpConstants.METHOD_CONNECT)
+                || method.equals(HttpConstants.METHOD_PATCH))
             {
                 return doRequest(request);
             }
@@ -277,110 +220,98 @@
             }
 
             // determine if the request path on this request denotes a different receiver
-            final MessageReceiver receiver = getTargetReceiver(message, endpoint);
+            //final MessageReceiver receiver = getTargetReceiver(message, endpoint);
 
             HttpResponse response;
             // the response only needs to be transformed explicitly if
             // A) the request was not served or B) a null result was returned
-            if (receiver != null)
-            {
-                String contextPath = HttpConnector.normalizeUrl(receiver.getEndpointURI().getPath());
-                message.setProperty(HttpConnector.HTTP_CONTEXT_PATH_PROPERTY,
-                                           contextPath,
-                                           PropertyScope.INBOUND);
+            String contextPath = HttpConnector.normalizeUrl(getEndpointURI().getPath());
+            message.setProperty(HttpConnector.HTTP_CONTEXT_PATH_PROPERTY,
+                                contextPath,
+                                PropertyScope.INBOUND);
 
-                message.setProperty(HttpConnector.HTTP_CONTEXT_URI_PROPERTY,
-                                    receiver.getEndpointURI().getAddress(),
-                                    PropertyScope.INBOUND);
+            message.setProperty(HttpConnector.HTTP_CONTEXT_URI_PROPERTY,
+                                getEndpointURI().getAddress(),
+                                PropertyScope.INBOUND);
 
-                message.setProperty(HttpConnector.HTTP_RELATIVE_PATH_PROPERTY,
-                                    processRelativePath(contextPath, path),
-                                    PropertyScope.INBOUND);
+            message.setProperty(HttpConnector.HTTP_RELATIVE_PATH_PROPERTY,
+                                processRelativePath(contextPath, path),
+                                PropertyScope.INBOUND);
 
-                ExecutionTemplate<MuleEvent> executionTemplate = createExecutionTemplate();
+            ExecutionTemplate<MuleEvent> executionTemplate = createExecutionTemplate();
 
-                MuleEvent returnEvent;
-                try
+            MuleEvent returnEvent;
+            try
+            {
+                returnEvent = executionTemplate.execute(new ExecutionCallback<MuleEvent>()
                 {
-                    returnEvent = executionTemplate.execute(new ExecutionCallback<MuleEvent>()
+                    @Override
+                    public MuleEvent process() throws Exception
                     {
-                        @Override
-                        public MuleEvent process() throws Exception
-                        {
-                            preRouteMessage(message);
-                            return receiver.routeMessage(message);
-                        }
-                    });
-                }
-                catch (MuleException e)
-                {
-                    throw e;
-                }
-                catch (IOException e)
-                {
-                    throw e;
-                }
-                catch (Exception e)
-                {
-                    throw new DefaultMuleException(e);
-                }
+                        preRouteMessage(message);
+                        return routeMessage(message);
+                    }
+                });
+            }
+            catch (MuleException e)
+            {
+                throw e;
+            }
+            catch (IOException e)
+            {
+                throw e;
+            }
+            catch (Exception e)
+            {
+                throw new DefaultMuleException(e);
+            }
 
-                MuleMessage returnMessage = returnEvent == null ? null : returnEvent.getMessage();
+            MuleMessage returnMessage = returnEvent == null ? null : returnEvent.getMessage();
 
-                Object tempResponse;
-                if (returnMessage != null)
-                {
-                    tempResponse = returnMessage.getPayload();
-                }
-                else
-                {
-                    tempResponse = NullPayload.getInstance();
-                }
-                // This removes the need for users to explicitly adding the response transformer
-                // ObjectToHttpResponse in their config
-                if (tempResponse instanceof HttpResponse)
-                {
-                    response = (HttpResponse) tempResponse;
-                }
-                else
-                {
-                    response = transformResponse(returnMessage, returnEvent);
-                }
+            Object tempResponse;
+            if (returnMessage != null)
+            {
+                tempResponse = returnMessage.getPayload();
+            }
+            else
+            {
+                tempResponse = NullPayload.getInstance();
+            }
+            // This removes the need for users to explicitly adding the response transformer
+            // ObjectToHttpResponse in their config
+            if (tempResponse instanceof HttpResponse)
+            {
+                response = (HttpResponse) tempResponse;
+            }
+            else
+            {
+                response = transformResponse(returnMessage);
+            }
 
-                response.setupKeepAliveFromRequestVersion(request.getRequestLine().getHttpVersion());
-                HttpConnector httpConnector = (HttpConnector) connector;
-                response.disableKeepAlive(!httpConnector.isKeepAlive());
+            response.setupKeepAliveFromRequestVersion(request.getRequestLine().getHttpVersion());
+            HttpConnector httpConnector = (HttpConnector) connector;
+            response.disableKeepAlive(!httpConnector.isKeepAlive());
 
-                Header connectionHeader = request.getFirstHeader("Connection");
-                if (connectionHeader != null)
+            Header connectionHeader = request.getFirstHeader("Connection");
+            if (connectionHeader != null)
+            {
+                String value = connectionHeader.getValue();
+                boolean endpointOverride = getEndpointKeepAliveValue(endpoint);
+                if ("keep-alive".equalsIgnoreCase(value) && endpointOverride)
                 {
-                    String value = connectionHeader.getValue();
-                    boolean endpointOverride = getEndpointKeepAliveValue(endpoint);
-                    if ("keep-alive".equalsIgnoreCase(value) && endpointOverride)
-                    {
-                        response.setKeepAlive(true);
+                    response.setKeepAlive(true);
 
-                        if (response.getHttpVersion().equals(HttpVersion.HTTP_1_0))
-                        {
-                            connectionHeader = new Header(HttpConstants.HEADER_CONNECTION, "Keep-Alive");
-                            response.setHeader(connectionHeader);
-                        }
-                    }
-                    else if ("close".equalsIgnoreCase(value))
+                    if (response.getHttpVersion().equals(HttpVersion.HTTP_1_0))
                     {
-                        response.setKeepAlive(false);
+                        connectionHeader = new Header(HttpConstants.HEADER_CONNECTION, "Keep-Alive");
+                        response.setHeader(connectionHeader);
                     }
                 }
+                else if ("close".equalsIgnoreCase(value))
+                {
+                    response.setKeepAlive(false);
+                }
             }
-            else
-            {
-                EndpointURI uri = endpoint.getEndpointURI();
-                String failedPath = String.format("%<a href="s://%s:%d%s">s://%s:%d%s",
-                                                  uri.getScheme(), uri.getHost(), uri.getPort(),
-                                                  message.getInboundProperty(HttpConnector.HTTP_REQUEST_PATH_PROPERTY));
-                response = buildFailureResponse(request.getRequestLine().getHttpVersion(), HttpConstants.SC_NOT_FOUND,
-                                                HttpMessages.cannotBindToAddress(failedPath).toString());
-            }
             return response;
         }
 
@@ -407,7 +338,7 @@
             HttpResponse response = new HttpResponse();
             response.setStatusLine(requestLine.getHttpVersion(), HttpConstants.SC_METHOD_NOT_ALLOWED);
             response.setBody(HttpMessages.methodNotAllowed(method).toString() + HttpConstants.CRLF);
-            return transformResponse(response, event);
+            return transformResponse(response);
         }
 
         protected HttpResponse doBad(RequestLine requestLine) throws MuleException
@@ -418,7 +349,7 @@
             HttpResponse response = new HttpResponse();
             response.setStatusLine(requestLine.getHttpVersion(), HttpConstants.SC_BAD_REQUEST);
             response.setBody(HttpMessages.malformedSyntax().toString() + HttpConstants.CRLF);
-            return transformResponse(response, event);
+            return transformResponse(response);
         }
 
         private void sendExpect100(HttpRequest request) throws MuleException, IOException
@@ -441,9 +372,9 @@
                         HttpResponse expected = new HttpResponse();
                         expected.setStatusLine(requestLine.getHttpVersion(), HttpConstants.SC_CONTINUE);
                         final DefaultMuleEvent event = new DefaultMuleEvent(new DefaultMuleMessage(expected,
-                            connector.getMuleContext()), (InboundEndpoint) endpoint, flowConstruct);
+                                                                                                   connector.getMuleContext()), (InboundEndpoint) endpoint, flowConstruct);
                         RequestContext.setEvent(event);
-                        conn.writeResponse(transformResponse(expected, event));
+                        conn.writeResponse(transformResponse(expected));
                     }
                 }
             }
@@ -451,9 +382,9 @@
 
         private HttpResponse buildFailureResponse(MuleEvent event, String description, int httpStatusCode) throws MuleException
         {
-            event.getMessage().setOutboundProperty(HttpConnector.HTTP_STATUS_PROPERTY,httpStatusCode);
+            event.getMessage().setOutboundProperty(HttpConnector.HTTP_STATUS_PROPERTY, httpStatusCode);
             event.getMessage().setPayload(description);
-            return transformResponse(event.getMessage(), event);
+            return transformResponse(event.getMessage());
         }
 
         protected HttpResponse buildFailureResponse(HttpVersion version, int statusCode, String description) throws MuleException
@@ -462,10 +393,10 @@
             response.setStatusLine(version, statusCode);
             response.setBody(description);
             DefaultMuleEvent event = new DefaultMuleEvent(new DefaultMuleMessage(response,
-                connector.getMuleContext()), (InboundEndpoint) endpoint, flowConstruct);
+                                                                                 connector.getMuleContext()), (InboundEndpoint) endpoint, flowConstruct);
             RequestContext.setEvent(event);
             // The DefaultResponseTransformer will set the necessary headers
-            return transformResponse(response, event);
+            return transformResponse(response);
         }
 
         protected void preRouteMessage(MuleMessage message) throws MessagingException
@@ -476,80 +407,22 @@
         @Override
         public void release()
         {
-            conn.close();
-            conn = null;
+            //Nothing to do.
         }
     }
 
     protected String processRelativePath(String contextPath, String path)
     {
         String relativePath = path.substring(contextPath.length());
-        if(relativePath.startsWith("/"))
+        if (relativePath.startsWith("/"))
         {
             return relativePath.substring(1);
         }
         return relativePath;
     }
 
-    protected MessageReceiver getTargetReceiver(MuleMessage message, ImmutableEndpoint ep)
-            throws ConnectException
+    protected HttpResponse transformResponse(Object response) throws MuleException
     {
-        String path = message.getInboundProperty(HttpConnector.HTTP_REQUEST_PROPERTY);
-        int i = path.indexOf('?');
-        if (i > -1)
-        {
-            path = path.substring(0, i);
-        }
-
-        StringBuffer requestUri = new StringBuffer(80);
-        if (path.indexOf("://") == -1)
-        {
-            requestUri.append(ep.getProtocol()).append("://");
-            requestUri.append(ep.getEndpointURI().getHost());
-            requestUri.append(':').append(ep.getEndpointURI().getPort());
-
-            if (!"/".equals(path))
-            {
-                requestUri.append(path);
-            }
-        }
-
-        String uriStr = requestUri.toString();
-        // first check that there is a receiver on the root address
-        if (logger.isTraceEnabled())
-        {
-            logger.trace("Looking up receiver on connector: " + connector.getName() + " with URI key: "
-                    + requestUri.toString());
-        }
-
-        MessageReceiver receiver = connector.lookupReceiver(uriStr);
-
-        // If no receiver on the root and there is a request path, look up the
-        // received based on the root plus request path
-        if (receiver == null && !"/".equals(path))
-        {
-            if (logger.isDebugEnabled())
-            {
-                logger.debug("Secondary lookup of receiver on connector: " + connector.getName()
-                        + " with URI key: " + requestUri.toString());
-            }
-
-            receiver = findReceiverByStem(connector.getReceivers(), uriStr);
-
-            if (receiver == null && logger.isWarnEnabled())
-            {
-                logger.warn("No receiver found with secondary lookup on connector: " + connector.getName()
-                        + " with URI key: " + requestUri.toString());
-                logger.warn("Receivers on connector are: "
-                        + MapUtils.toString(connector.getReceivers(), true));
-            }
-        }
-
-        return receiver;
-    }
-
-    protected HttpResponse transformResponse(Object response, MuleEvent event) throws MuleException
-    {
         MuleMessage message;
         if (response instanceof MuleMessage)
         {
@@ -565,22 +438,6 @@
         return (HttpResponse) message.getPayload();
     }
 
-    public static MessageReceiver findReceiverByStem(Map<Object, MessageReceiver> receivers, String uriStr)
-    {
-        int match = 0;
-        MessageReceiver receiver = null;
-        for (Map.Entry<Object, MessageReceiver> e : receivers.entrySet())
-        {
-            String key = (String) e.getKey();
-            MessageReceiver candidate = e.getValue();
-            if (uriStr.startsWith(key) && match < key.length())
-            {
-                match = key.length();
-                receiver = candidate;
-            }
-        }
-        return receiver;
-    }
 
     @Override
     protected void initializeMessageFactory() throws InitialisationException
@@ -591,11 +448,11 @@
             factory = (HttpMuleMessageFactory) super.createMuleMessageFactory();
 
             boolean enableCookies = MapUtils.getBooleanValue(endpoint.getProperties(),
-                HttpConnector.HTTP_ENABLE_COOKIES_PROPERTY, ((HttpConnector) connector).isEnableCookies());
+                                                             HttpConnector.HTTP_ENABLE_COOKIES_PROPERTY, ((HttpConnector) connector).isEnableCookies());
             factory.setEnableCookies(enableCookies);
 
             String cookieSpec = MapUtils.getString(endpoint.getProperties(),
-                HttpConnector.HTTP_COOKIE_SPEC_PROPERTY, ((HttpConnector) connector).getCookieSpec());
+                                                   HttpConnector.HTTP_COOKIE_SPEC_PROPERTY, ((HttpConnector) connector).getCookieSpec());
             factory.setCookieSpec(cookieSpec);
 
             factory.setExchangePattern(endpoint.getExchangePattern());
@@ -622,4 +479,14 @@
         message.setOutboundProperty(HttpConnector.HTTP_STATUS_PROPERTY, String.valueOf(HttpConstants.SC_NOT_ACCEPTABLE));
         return message;
     }
+
+    public static class EmptyRequestException extends RuntimeException
+    {
+
+    }
+
+    public static class FailureProcessingRequestException extends RuntimeException
+    {
+
+    }
 }

Modified: branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpRequest.java (25118 => 25119)


--- branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpRequest.java	2012-12-10 19:56:53 UTC (rev 25118)
+++ branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpRequest.java	2012-12-10 21:20:57 UTC (rev 25119)
@@ -36,7 +36,7 @@
     private String defaultEncoding;
 
     public HttpRequest(final RequestLine requestLine, final Header[] headers, final InputStream content, String defaultEncoding)
-        throws IOException
+            throws IOException
     {
         super();
         if (requestLine == null)
@@ -240,4 +240,9 @@
         }
     }
 
+    public String getUrlWithoutParams()
+    {
+        return this.requestLine.getUrlWithoutParams();
+    }
+
 }

Added: branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpRequestDispatcher.java (0 => 25119)


--- branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpRequestDispatcher.java	                        (rev 0)
+++ branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpRequestDispatcher.java	2012-12-10 21:20:57 UTC (rev 25119)
@@ -0,0 +1,144 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.transport.http;
+
+import org.mule.api.context.WorkManager;
+import org.mule.api.retry.RetryCallback;
+import org.mule.api.retry.RetryContext;
+import org.mule.api.retry.RetryPolicyTemplate;
+import org.mule.transport.ConnectException;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.resource.spi.work.Work;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Manage a ServerSocket.
+ * <p/>
+ * Lookup the right MessageReceiver for each HttpRequest and dispatch the socket to the MessageReceiver for further processing.
+ */
+class HttpRequestDispatcher implements Work
+{
+
+    private static Log logger = LogFactory.getLog(HttpRequestDispatcher.class);
+
+    private ServerSocket serverSocket;
+    private HttpConnector httpConnector;
+    private RetryPolicyTemplate retryTemplate;
+    private WorkManager workManager;
+    private final AtomicBoolean disconnect = new AtomicBoolean(false);
+
+    public HttpRequestDispatcher(final HttpConnector httpConnector, final RetryPolicyTemplate retryPolicyTemplate, final ServerSocket serverSocket, final WorkManager workManager)
+    {
+        if (httpConnector == null)
+        {
+            throw new IllegalArgumentException("HttpConnector can not be null");
+        }
+        if (retryPolicyTemplate == null)
+        {
+            throw new IllegalArgumentException("RetryPolicyTemplate can not be null");
+        }
+        if (serverSocket == null)
+        {
+            throw new IllegalArgumentException("ServerSocket can not be null");
+        }
+        if (workManager == null)
+        {
+            throw new IllegalArgumentException("WorkManager can not be null");
+        }
+        this.httpConnector = httpConnector;
+        this.retryTemplate = retryPolicyTemplate;
+        this.serverSocket = serverSocket;
+        this.workManager = workManager;
+    }
+
+    @Override
+    public void run()
+    {
+        while (!disconnect.get())
+        {
+            if (httpConnector.isStarted() && !disconnect.get())
+            {
+                try
+                {
+                    retryTemplate.execute(new RetryCallback()
+                    {
+                        public void doWork(RetryContext context) throws Exception
+                        {
+                            Socket socket = null;
+                            try
+                            {
+                                socket = serverSocket.accept();
+                            }
+                            catch (Exception e)
+                            {
+                                if (!httpConnector.isDisposed() && !disconnect.get())
+                                {
+                                    throw new ConnectException(e, null);
+                                }
+                            }
+
+                            if (socket != null)
+                            {
+                                Work work = new HttpRequestDispatcherWork(httpConnector, socket);
+                                workManager.scheduleWork(work, javax.resource.spi.work.WorkManager.INDEFINITE, null, httpConnector);
+                            }
+                        }
+
+                        public String getWorkDescription()
+                        {
+                            String hostName = ((InetSocketAddress) serverSocket.getLocalSocketAddress()).getHostName();
+                            int port = ((InetSocketAddress) serverSocket.getLocalSocketAddress()).getPort();
+                            return String.format("%<a href="s://%s:%d">s://%s:%d", httpConnector.getProtocol(), hostName, port);
+                        }
+                    }, workManager);
+                }
+                catch (Exception e)
+                {
+                    httpConnector.getMuleContext().getExceptionListener().handleException(e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void release()
+    {
+
+    }
+
+    void disconnect()
+    {
+        disconnect.set(true);
+        try
+        {
+            if (serverSocket != null)
+            {
+                if (logger.isDebugEnabled())
+                {
+                    logger.debug("Closing: " + serverSocket);
+                }
+                serverSocket.close();
+            }
+        }
+        catch (IOException e)
+        {
+            logger.warn("Failed to close server socket: " + e.getMessage(), e);
+        }
+    }
+
+}
Property changes on: branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpRequestDispatcher.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpRequestDispatcherWork.java (0 => 25119)


--- branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpRequestDispatcherWork.java	                        (rev 0)
+++ branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpRequestDispatcherWork.java	2012-12-10 21:20:57 UTC (rev 25119)
@@ -0,0 +1,128 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.transport.http;
+
+import org.mule.transport.http.i18n.HttpMessages;
+import org.mule.util.monitor.Expirable;
+
+import java.net.Socket;
+import java.util.concurrent.TimeUnit;
+
+import javax.resource.spi.work.Work;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Dispatches HttpRequest to the appropriate MessageReceiver
+ */
+public class HttpRequestDispatcherWork implements Work, Expirable
+{
+
+    private static Log logger = LogFactory.getLog(HttpRequestDispatcherWork.class);
+
+    private HttpServerConnection httpServerConnection;
+    private Socket socket;
+    private HttpConnector httpConnector;
+
+    public HttpRequestDispatcherWork(HttpConnector httpConnector, Socket socket)
+    {
+        if (httpConnector == null)
+        {
+            throw new IllegalArgumentException("HttpConnector can not be null");
+        }
+        if (socket == null)
+        {
+            throw new IllegalArgumentException("Socket can not be null");
+        }
+        this.httpConnector = httpConnector;
+        this.socket = socket;
+    }
+
+    @Override
+    public void release()
+    {
+    }
+
+    @Override
+    public void run()
+    {
+        try
+        {
+            long keepAliveTimeout = httpConnector.getKeepAliveTimeout();
+            String encoding = httpConnector.getMuleContext().getConfiguration().getDefaultEncoding();
+            httpServerConnection = new HttpServerConnection(socket, encoding, httpConnector);
+            do
+            {
+                try
+                {
+
+                    httpServerConnection.setKeepAlive(false);
+
+                    // Only add a monitor if the timeout has been set
+                    if (keepAliveTimeout > 0)
+                    {
+                        httpConnector.getKeepAliveMonitor().addExpirable(
+                                keepAliveTimeout, TimeUnit.MILLISECONDS, this);
+                    }
+
+                    HttpRequest request = httpServerConnection.readRequest();
+                    HttpMessageReceiver httpMessageReceiver = httpConnector.lookupReceiver(socket, request);
+                    if (httpMessageReceiver != null)
+                    {
+                        Work work = httpMessageReceiver.createWork(httpServerConnection);
+                        work.run();
+                    }
+                    else
+                    {
+                        httpServerConnection.writeFailureResponse(HttpConstants.SC_NOT_FOUND, HttpMessages.cannotBindToAddress(httpServerConnection.getFullUri()).toString());
+                    }
+                }
+                finally
+                {
+                    httpConnector.getKeepAliveMonitor().removeExpirable(this);
+                    httpServerConnection.reset();
+                }
+            }
+            while (httpServerConnection.isKeepAlive());
+        }
+        catch (HttpMessageReceiver.EmptyRequestException e)
+        {
+            logger.debug("Discarding request since content was empty");
+        }
+        catch (HttpMessageReceiver.FailureProcessingRequestException e)
+        {
+            logger.debug("Closing socket due to failure during request processing");
+        }
+        catch (Exception e)
+        {
+            httpConnector.getMuleContext().getExceptionListener().handleException(e);
+        }
+        finally
+        {
+            logger.debug("Closing HTTP connection.");
+            if (httpServerConnection != null && httpServerConnection.isOpen())
+            {
+                httpServerConnection.close();
+                httpServerConnection = null;
+            }
+        }
+    }
+
+    @Override
+    public void expired()
+    {
+        if (httpServerConnection.isOpen())
+        {
+            httpServerConnection.close();
+        }
+    }
+
+}
Property changes on: branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpRequestDispatcherWork.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Modified: branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpServerConnection.java (25118 => 25119)


--- branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpServerConnection.java	2012-12-10 19:56:53 UTC (rev 25118)
+++ branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpServerConnection.java	2012-12-10 21:20:57 UTC (rev 25119)
@@ -11,20 +11,28 @@
 package org.mule.transport.http;
 
 import org.mule.RequestContext;
-import org.mule.api.transformer.TransformerException;
 import org.mule.api.transport.Connector;
 import org.mule.api.transport.OutputHandler;
 import org.mule.util.SystemUtils;
+import org.mule.util.concurrent.Latch;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
+import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.net.SocketAddress;
 import java.net.SocketException;
+import java.security.cert.Certificate;
 import java.util.Iterator;
 
+import javax.net.ssl.HandshakeCompletedEvent;
+import javax.net.ssl.HandshakeCompletedListener;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSocket;
+
 import org.apache.commons.httpclient.ChunkedOutputStream;
 import org.apache.commons.httpclient.Header;
 import org.apache.commons.httpclient.HttpParser;
@@ -33,9 +41,12 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-/** A connection to the SimpleHttpServer. */
-public class HttpServerConnection
+/**
+ * A connection to the SimpleHttpServer.
+ */
+public class HttpServerConnection implements HandshakeCompletedListener
 {
+
     private static final Log logger = LogFactory.getLog(HttpServerConnection.class);
 
     private Socket socket;
@@ -44,6 +55,10 @@
     // this should rather be isKeepSocketOpen as this is the main purpose of this flag
     private boolean keepAlive = false;
     private final String encoding;
+    private HttpRequest cachedRequest;
+    private Latch sslSocketHandshakeComplete = new Latch();
+    private Certificate[] peerCertificateChain;
+    private Certificate[] localCertificateChain;
 
     public HttpServerConnection(final Socket socket, String encoding, HttpConnector connector) throws IOException
     {
@@ -55,20 +70,26 @@
         }
 
         this.socket = socket;
+
+        if (this.socket instanceof SSLSocket)
+        {
+            ((SSLSocket) socket).addHandshakeCompletedListener(this);
+        }
+
         setSocketTcpNoDelay();
         this.socket.setKeepAlive(connector.isKeepAlive());
-        
+
         if (connector.getReceiveBufferSize() != Connector.INT_VALUE_NOT_SET
             && socket.getReceiveBufferSize() != connector.getReceiveBufferSize())
         {
-            socket.setReceiveBufferSize(connector.getReceiveBufferSize());            
+            socket.setReceiveBufferSize(connector.getReceiveBufferSize());
         }
         if (connector.getServerSoTimeout() != Connector.INT_VALUE_NOT_SET
             && socket.getSoTimeout() != connector.getServerSoTimeout())
         {
             socket.setSoTimeout(connector.getServerSoTimeout());
         }
-        
+
         this.in = socket.getInputStream();
         this.out = new DataOutputStream(socket.getOutputStream());
         this.encoding = encoding;
@@ -86,7 +107,7 @@
             {
                 // this is a known Solaris bug, see
                 // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6378870
-                
+
                 if (logger.isDebugEnabled())
                 {
                     logger.debug("Failed to set tcpNoDelay on socket", se);
@@ -109,7 +130,7 @@
                 {
                     logger.debug("Closing: " + socket);
                 }
-                
+
                 try
                 {
                     socket.shutdownOutput();
@@ -118,7 +139,7 @@
                 {
                     //Can't shutdown in/output on SSL sockets
                 }
-                
+
                 if (in != null)
                 {
                     in.close();
@@ -180,6 +201,10 @@
 
     public HttpRequest readRequest() throws IOException
     {
+        if (cachedRequest != null)
+        {
+            return cachedRequest;
+        }
         try
         {
             String line = readLine();
@@ -187,7 +212,8 @@
             {
                 return null;
             }
-            return new HttpRequest(RequestLine.parseLine(line), HttpParser.parseHeaders(this.in, encoding), this.in, encoding);
+            cachedRequest = new HttpRequest(RequestLine.parseLine(line), HttpParser.parseHeaders(this.in, encoding), this.in, encoding);
+            return cachedRequest;
         }
         catch (IOException e)
         {
@@ -271,21 +297,21 @@
         outstream.flush();
     }
 
-    public void writeResponse(final HttpResponse response) throws IOException, TransformerException
+    public void writeResponse(final HttpResponse response) throws IOException
     {
         if (response == null)
         {
             return;
         }
-        
-        if (!response.isKeepAlive()) 
+
+        if (!response.isKeepAlive())
         {
             Header header = new Header(HttpConstants.HEADER_CONNECTION, "close");
             response.setHeader(header);
         }
-        
+
         setKeepAlive(response.isKeepAlive());
-        
+
         ResponseWriter writer = new ResponseWriter(this.out, encoding);
         OutputStream outstream = this.out;
 
@@ -323,6 +349,57 @@
         outstream.flush();
     }
 
+    /**
+     * Returns the path of the http request without the http parameters encoded in the URL
+     *
+     * @return
+     * @throws IOException
+     */
+    public String getUrlWithoutRequestParams() throws IOException
+    {
+        return readRequest().getUrlWithoutParams();
+    }
+
+    public String getRemoteClientAddress()
+    {
+        final SocketAddress clientAddress = socket.getRemoteSocketAddress();
+        if (clientAddress != null)
+        {
+            return clientAddress.toString();
+        }
+        return null;
+    }
+
+    /**
+     * Sends to the customer a Failure response.
+     *
+     * @param statusCode  http status code to send to the client
+     * @param description description to send as the body of the response
+     * @throws IOException when it's not possible to write the response back to the client.
+     */
+    public void writeFailureResponse(int statusCode, String description) throws IOException
+    {
+        HttpResponse response = new HttpResponse();
+        response.setStatusLine(readRequest().getRequestLine().getHttpVersion(), statusCode);
+        response.setBody(description);
+        writeResponse(response);
+    }
+
+    /**
+     * @return the uri for the request including scheme, host, port and path. i.e: http://192.168.1.1:7777/service/orders
+     * @throws IOException
+     */
+    public String getFullUri() throws IOException
+    {
+        String scheme = "http";
+        if (socket instanceof SSLSocket)
+        {
+            scheme = "https";
+        }
+        InetSocketAddress localSocketAddress = (InetSocketAddress) socket.getLocalSocketAddress();
+        return String.format("%<a href="s://%s:%d%s">s://%s:%d%s", scheme, localSocketAddress.getHostName(), localSocketAddress.getPort(), readRequest().getUrlWithoutParams());
+    }
+
     public int getSocketTimeout() throws SocketException
     {
         return this.socket.getSoTimeout();
@@ -332,4 +409,62 @@
     {
         this.socket.setSoTimeout(timeout);
     }
+
+    public Latch getSslSocketHandshakeCompleteLatch()
+    {
+        if (!(socket instanceof SSLSocket))
+        {
+            throw new IllegalStateException("The socket type is not SSL");
+        }
+        return sslSocketHandshakeComplete;
+    }
+
+    /**
+     * Clean up cached values.
+     * <p/>
+     * Must be called if a new request from the same socket associated with the instance is going to be processed.
+     */
+    public void reset()
+    {
+        this.cachedRequest = null;
+    }
+
+    @Override
+    public void handshakeCompleted(HandshakeCompletedEvent handshakeCompletedEvent)
+    {
+        try
+        {
+            localCertificateChain = handshakeCompletedEvent.getLocalCertificates();
+            try
+            {
+                peerCertificateChain = handshakeCompletedEvent.getPeerCertificates();
+            }
+            catch (SSLPeerUnverifiedException e)
+            {
+                logger.debug("Cannot get peer certificate chain: " + e.getMessage());
+            }
+        }
+        finally
+        {
+            sslSocketHandshakeComplete.release();
+        }
+    }
+
+    public Certificate[] getLocalCertificateChain()
+    {
+        if (!(socket instanceof SSLSocket))
+        {
+            throw new IllegalStateException("The socket type is not SSL");
+        }
+        return localCertificateChain;
+    }
+
+    public Certificate[] getPeerCertificateChain()
+    {
+        if (!(socket instanceof SSLSocket))
+        {
+            throw new IllegalStateException("The socket type is not SSL");
+        }
+        return peerCertificateChain;
+    }
 }

Modified: branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpsMessageReceiver.java (25118 => 25119)


--- branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpsMessageReceiver.java	2012-12-10 19:56:53 UTC (rev 25118)
+++ branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpsMessageReceiver.java	2012-12-10 21:20:57 UTC (rev 25119)
@@ -22,19 +22,13 @@
 import org.mule.util.StringUtils;
 
 import java.io.IOException;
-import java.net.Socket;
-import java.security.cert.Certificate;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import javax.net.ssl.HandshakeCompletedEvent;
-import javax.net.ssl.HandshakeCompletedListener;
-import javax.net.ssl.SSLPeerUnverifiedException;
-import javax.net.ssl.SSLSocket;
 import javax.resource.spi.work.Work;
 
 public class HttpsMessageReceiver extends HttpMessageReceiver
 {
+
     public HttpsMessageReceiver(Connector connector, FlowConstruct flow, InboundEndpoint endpoint)
             throws CreateException
     {
@@ -59,21 +53,17 @@
     }
 
     @Override
-    protected Work createWork(Socket socket) throws IOException
+    public Work createWork(HttpServerConnection httpServerConnection) throws IOException
     {
-        return new HttpsWorker(socket);
+        return new HttpsWorker(httpServerConnection);
     }
 
-    private class HttpsWorker extends HttpWorker implements HandshakeCompletedListener
+    private class HttpsWorker extends HttpWorker
     {
-        private Certificate[] peerCertificateChain;
-        private Certificate[] localCertificateChain;
-        private final CountDownLatch latch = new CountDownLatch(1);
 
-        public HttpsWorker(Socket socket) throws IOException
+        public HttpsWorker(HttpServerConnection httpServerConnection) throws IOException
         {
-            super(socket);
-            ((SSLSocket) socket).addHandshakeCompletedListener(this);
+            super(httpServerConnection);
         }
 
         @Override
@@ -82,7 +72,7 @@
             try
             {
                 long timeout = ((HttpsConnector) getConnector()).getSslHandshakeTimeout();
-                boolean handshakeComplete = latch.await(timeout, TimeUnit.MILLISECONDS);
+                boolean handshakeComplete = getServerConnection().getSslSocketHandshakeCompleteLatch().await(timeout, TimeUnit.MILLISECONDS);
                 if (!handshakeComplete)
                 {
                     throw new MessagingException(HttpMessages.sslHandshakeDidNotComplete(), message);
@@ -91,39 +81,20 @@
             catch (InterruptedException e)
             {
                 throw new MessagingException(HttpMessages.sslHandshakeDidNotComplete(),
-                    message, e);
+                                             message, e);
             }
 
             super.preRouteMessage(message);
 
-            if (peerCertificateChain != null)
+            if (getServerConnection().getPeerCertificateChain() != null)
             {
-                message.setOutboundProperty(HttpsConnector.PEER_CERTIFICATES, peerCertificateChain);
+                message.setOutboundProperty(HttpsConnector.PEER_CERTIFICATES, getServerConnection().getPeerCertificateChain());
             }
-            if (localCertificateChain != null)
+            if (getServerConnection().getLocalCertificateChain() != null)
             {
-                message.setOutboundProperty(HttpsConnector.LOCAL_CERTIFICATES, localCertificateChain);
+                message.setOutboundProperty(HttpsConnector.LOCAL_CERTIFICATES, getServerConnection().getLocalCertificateChain());
             }
         }
 
-        public void handshakeCompleted(HandshakeCompletedEvent event)
-        {
-            try
-            {
-                localCertificateChain = event.getLocalCertificates();
-                try
-                {
-                    peerCertificateChain = event.getPeerCertificates();
-                }
-                catch (SSLPeerUnverifiedException e)
-                {
-                    logger.debug("Cannot get peer certificate chain: "+ e.getMessage());
-                }
-            }
-            finally
-            {
-                latch.countDown();
-            }
-        }
     }
 }

Modified: branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/RequestLine.java (25118 => 25119)


--- branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/RequestLine.java	2012-12-10 19:56:53 UTC (rev 25118)
+++ branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/RequestLine.java	2012-12-10 21:20:57 UTC (rev 25119)
@@ -27,9 +27,11 @@
 public class RequestLine
 {
 
+    public static final char PARAMETERS_SEPARATOR = '?';
     private HttpVersion httpversion = null;
     private String method = null;
     private String uri = null;
+    private String uriWithoutParams;
 
     public static RequestLine parseLine(final String l) throws HttpException
     {
@@ -38,7 +40,7 @@
         String protocol;
         try
         {
-            if (l==null)
+            if (l == null)
             {
                 throw new ProtocolException(HttpMessages.requestLineIsMalformed(l).getMessage());
             }
@@ -75,7 +77,7 @@
     }
 
     public RequestLine(final String method, final String uri, final String httpversion)
-        throws ProtocolException
+            throws ProtocolException
     {
         this(method, uri, HttpVersion.parse(httpversion));
     }
@@ -125,4 +127,21 @@
         sb.append(this.httpversion);
         return sb.toString();
     }
+
+    /**
+     * @return the url without the request parameters
+     */
+    public String getUrlWithoutParams()
+    {
+        if (this.uriWithoutParams == null)
+        {
+            uriWithoutParams = getUri();
+            int i = uriWithoutParams.indexOf(PARAMETERS_SEPARATOR);
+            if (i > -1)
+            {
+                uriWithoutParams = uriWithoutParams.substring(0, i);
+            }
+        }
+        return uriWithoutParams;
+    }
 }

Added: branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpConnectionManagerTestCase.java (0 => 25119)


--- branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpConnectionManagerTestCase.java	                        (rev 0)
+++ branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpConnectionManagerTestCase.java	2012-12-10 21:20:57 UTC (rev 25119)
@@ -0,0 +1,122 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.transport.http;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.mule.api.MuleContext;
+import org.mule.api.context.WorkManager;
+import org.mule.api.endpoint.EndpointException;
+import org.mule.endpoint.MuleEndpointURI;
+import org.mule.tck.junit4.AbstractMuleTestCase;
+import org.mule.tck.size.SmallTest;
+import org.mule.transport.ConnectException;
+
+import java.io.IOException;
+import java.net.URI;
+
+import javax.resource.spi.work.ExecutionContext;
+import javax.resource.spi.work.WorkListener;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+@SmallTest
+public class HttpConnectionManagerTestCase extends AbstractMuleTestCase
+{
+
+    public static final String DEFAULT_ENDPOINT_URI = "http://localhost:1234/";
+    public static final String NESTED_ENDPOINT_URI_1 = "http://localhost:1234/service";
+    public static final String NESTED_ENDPOINT_URI_2 = "http://localhost:1234/service/order";
+    public static final String ANOTHER_ENDPOINT_URI = "http://localhost:1235/service";
+    public static final String ANOTHER_NESTED_ENDPOINT_URI = "http://localhost:1235/service/order";
+
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private WorkManager mockWorkManager;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private HttpConnector mockHttpConnector;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private MuleContext mockMuleContext;
+
+    @Test(expected = IllegalArgumentException.class)
+    public void constructorWithNullWorkManager()
+    {
+        new HttpConnectionManager(mockHttpConnector, null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void constructorWithNullConnector()
+    {
+        new HttpConnectionManager(null, mockWorkManager);
+    }
+
+    @Test(expected = ConnectException.class)
+    public void workSchedulingFails() throws Exception
+    {
+        when(mockHttpConnector.getServerSocket(any(URI.class))).thenThrow(IOException.class);
+        createConnectionManagerAndAddDefaultEndpointUri();
+    }
+
+    @Test
+    public void removeConnectionWithoutDispatcherDoesntFail() throws Exception
+    {
+        HttpConnectionManager connectionManager = new HttpConnectionManager(mockHttpConnector, mockWorkManager);
+        connectionManager.removeConnection(new MuleEndpointURI("http://localhost:1234/service/path", mockMuleContext));
+    }
+
+    @Test
+    public void addConnectionStartsSocketDispatcher() throws Exception
+    {
+        createConnectionManagerAndAddDefaultEndpointUri();
+        verify(mockWorkManager, times(1)).scheduleWork(any(HttpRequestDispatcher.class), anyLong(), any(ExecutionContext.class), any(WorkListener.class));
+    }
+
+    @Test
+    public void add3EndpointUrisToSameHostPortOnlyExecutesOneDispatcher() throws Exception
+    {
+        HttpConnectionManager connectionManager = createConnectionManagerAndAddDefaultEndpointUri();
+        connectionManager.addConnection(createEndpointUri(NESTED_ENDPOINT_URI_1));
+        connectionManager.addConnection(createEndpointUri(NESTED_ENDPOINT_URI_2));
+        verify(mockWorkManager, times(1)).scheduleWork(any(HttpRequestDispatcher.class), anyLong(), any(ExecutionContext.class), any(WorkListener.class));
+    }
+
+    @Test
+    public void addEndpointsToDifferentHostPortOpensSeveralConnections() throws Exception
+    {
+        HttpConnectionManager connectionManager = createConnectionManagerAndAddDefaultEndpointUri();
+        connectionManager.addConnection(createEndpointUri(NESTED_ENDPOINT_URI_1));
+        connectionManager.addConnection(createEndpointUri(NESTED_ENDPOINT_URI_2));
+        connectionManager.addConnection(createEndpointUri(ANOTHER_ENDPOINT_URI));
+        connectionManager.addConnection(createEndpointUri(ANOTHER_NESTED_ENDPOINT_URI));
+        verify(mockWorkManager, times(2)).scheduleWork(any(HttpRequestDispatcher.class), anyLong(), any(ExecutionContext.class), any(WorkListener.class));
+    }
+
+    private HttpConnectionManager createConnectionManagerAndAddDefaultEndpointUri() throws ConnectException, EndpointException
+    {
+        HttpConnectionManager connectionManager = new HttpConnectionManager(mockHttpConnector, mockWorkManager);
+        connectionManager.addConnection(createEndpointUri(DEFAULT_ENDPOINT_URI));
+        return connectionManager;
+    }
+
+    private MuleEndpointURI createEndpointUri(String uri) throws EndpointException
+    {
+        return new MuleEndpointURI(uri, mockMuleContext);
+    }
+
+
+}
Property changes on: branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpConnectionManagerTestCase.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Modified: branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpConnectorTestCase.java (25118 => 25119)


--- branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpConnectorTestCase.java	2012-12-10 19:56:53 UTC (rev 25118)
+++ branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpConnectorTestCase.java	2012-12-10 21:20:57 UTC (rev 25119)
@@ -10,21 +10,52 @@
 
 package org.mule.transport.http;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
 import org.mule.api.endpoint.InboundEndpoint;
 import org.mule.api.service.Service;
 import org.mule.api.transport.Connector;
+import org.mule.api.transport.MessageReceiver;
 import org.mule.tck.testmodels.fruit.Orange;
 import org.mule.transport.AbstractConnectorTestCase;
 import org.mule.transport.tcp.TcpConnector;
 
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.commons.httpclient.params.HttpConnectionManagerParams;
+import org.hamcrest.core.Is;
+import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
 
-import static org.junit.Assert.assertEquals;
-
+@RunWith(MockitoJUnitRunner.class)
 public class HttpConnectorTestCase extends AbstractConnectorTestCase
 {
 
+    @Mock
+    private HttpMessageReceiver mockServiceOrderReceiverPort5555;
+    @Mock
+    private HttpMessageReceiver mockServiceReceiverPort5555;
+    @Mock
+    private HttpMessageReceiver mockServiceOrderReceiverPort7777;
+    @Mock
+    private HttpMessageReceiver mockServiceReceiverPort7777;
+    @Mock
+    private HttpMessageReceiver mockServiceReceiverAnotherHost;
+    @Mock
+    private HttpMessageReceiver mockReceiverPort5555;
+    @Mock
+    private HttpRequest mockHttpRequest;
+    @Mock
+    private Socket mockSocket;
+
     @Override
     public Connector createConnector() throws Exception
     {
@@ -48,7 +79,7 @@
     {
         Service service = getTestService("orange", Orange.class);
         InboundEndpoint endpoint = muleContext.getEndpointFactory().getInboundEndpoint(
-            getTestEndpointURI());
+                getTestEndpointURI());
 
         getConnector().registerListener(endpoint, getSensingNullMessageProcessor(), service);
     }
@@ -71,4 +102,61 @@
         // all kinds of timeouts are now being tested in TcpConnectorTestCase
     }
 
+    @Test
+    public void findReceiverByStem() throws Exception
+    {
+        Map<Object, MessageReceiver> receiversMap = createTestReceivers();
+        assertThat((HttpMessageReceiver) HttpConnector.findReceiverByStem(receiversMap, "http://somehost:5555/"), Is.is(mockReceiverPort5555));
+        assertThat((HttpMessageReceiver) HttpConnector.findReceiverByStem(receiversMap, "http://somehost:5555/service"), Is.is(mockServiceReceiverPort5555));
+        assertThat((HttpMessageReceiver) HttpConnector.findReceiverByStem(receiversMap, "http://somehost:5555/service/order"), Is.is(mockServiceOrderReceiverPort5555));
+        assertThat((HttpMessageReceiver) HttpConnector.findReceiverByStem(receiversMap, "http://somehost:7777/service/order"), Is.is(mockServiceOrderReceiverPort7777));
+        assertThat((HttpMessageReceiver) HttpConnector.findReceiverByStem(receiversMap, "http://somehost:7777/service"), Is.is(mockServiceReceiverPort7777));
+        assertThat((HttpMessageReceiver) HttpConnector.findReceiverByStem(receiversMap, "http://anotherhost:5555/"), Is.is(mockServiceReceiverAnotherHost));
+    }
+
+    private Map<Object, MessageReceiver> createTestReceivers()
+    {
+        Map<Object, MessageReceiver> receiversMap = new HashMap<Object, MessageReceiver>();
+        receiversMap.put("http://somehost:5555/service/order", mockServiceOrderReceiverPort5555);
+        receiversMap.put("http://somehost:5555/service", mockServiceReceiverPort5555);
+        receiversMap.put("http://somehost:5555/", mockReceiverPort5555);
+        receiversMap.put("http://somehost:7777/service/order", mockServiceOrderReceiverPort7777);
+        receiversMap.put("http://somehost:7777/service", mockServiceReceiverPort7777);
+        receiversMap.put("http://anotherhost:5555/", mockServiceReceiverAnotherHost);
+        return receiversMap;
+    }
+
+    @Test
+    public void lookupReceiverThatDoesNotExistsInThatPort() throws Exception
+    {
+        testLookupReceiver("somehost", 8888, "/management", null);
+    }
+
+    @Test
+    public void lookupReceiverThatDoesNotExistsInThatHost() throws Exception
+    {
+        testLookupReceiver("nonexistenthost", 5555, "/service", null);
+    }
+
+    @Test
+    public void lookupReceiverThatContainsPath() throws Exception
+    {
+        testLookupReceiver("somehost", 5555, "/service/product", mockServiceReceiverPort5555);
+    }
+
+    @Test
+    public void lookupReceiverThatExistsWithExactSamePath() throws Exception
+    {
+        testLookupReceiver("somehost", 5555, "/service/order?param1=value1", mockServiceOrderReceiverPort5555);
+    }
+
+    private void testLookupReceiver(String host, int port, String path, HttpMessageReceiver expectedMessageReceiver)
+    {
+        HttpConnector httpConnector = (HttpConnector) getConnector();
+        httpConnector.getReceivers().putAll(createTestReceivers());
+        when(mockHttpRequest.getUrlWithoutParams()).thenReturn(path);
+        when(mockSocket.getLocalSocketAddress()).thenReturn(new InetSocketAddress(host, port));
+        Assert.assertThat(httpConnector.lookupReceiver(mockSocket, mockHttpRequest), Is.is(expectedMessageReceiver));
+    }
+
 }

Added: branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpRequestDispatcherTestCase.java (0 => 25119)


--- branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpRequestDispatcherTestCase.java	                        (rev 0)
+++ branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpRequestDispatcherTestCase.java	2012-12-10 21:20:57 UTC (rev 25119)
@@ -0,0 +1,216 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.transport.http;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.mule.api.context.WorkManager;
+import org.mule.api.exception.SystemExceptionHandler;
+import org.mule.api.retry.RetryCallback;
+import org.mule.api.retry.RetryContext;
+import org.mule.api.retry.RetryPolicyTemplate;
+import org.mule.tck.junit4.AbstractMuleTestCase;
+import org.mule.tck.size.SmallTest;
+import org.mule.transport.AbstractConnector;
+import org.mule.transport.ConnectorLifecycleManager;
+import org.mule.util.concurrent.Latch;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.ServerSocket;
+import java.util.concurrent.TimeUnit;
+
+import javax.resource.spi.work.ExecutionContext;
+import javax.resource.spi.work.WorkListener;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+@RunWith(MockitoJUnitRunner.class)
+@SmallTest
+public class HttpRequestDispatcherTestCase extends AbstractMuleTestCase
+{
+
+    public static final int WAIT_TIME = 5000;
+
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private HttpConnector mockHttpConnector;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private WorkManager mockWorkManager;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private ServerSocket mockServerSocket;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private RetryPolicyTemplate mockRetryTemplate;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private ConnectorLifecycleManager mockConnectorLifecycleManager;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private RetryContext mockRetryContext;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private SystemExceptionHandler mockExceptionListener;
+
+
+    @Test(expected = IllegalArgumentException.class)
+    public void createHttpSocketDispatcherWithNullConnector()
+    {
+        new HttpRequestDispatcher(null, mockRetryTemplate, mockServerSocket, mockWorkManager);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void createHttpSocketDispatcherWithNullRetryPolicyTemplate()
+    {
+        new HttpRequestDispatcher(mockHttpConnector, null, mockServerSocket, mockWorkManager);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void createHttpSocketDispatcherWithNullServerSocket()
+    {
+        new HttpRequestDispatcher(mockHttpConnector, mockRetryTemplate, null, mockWorkManager);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void createHttpSocketDispatcherWithNullWorkManager()
+    {
+        new HttpRequestDispatcher(mockHttpConnector, mockRetryTemplate, mockServerSocket, null);
+    }
+
+    @Test
+    public void closeServerSocketWhenDisconnect() throws IOException
+    {
+        HttpRequestDispatcher httpRequestDispatcher = new HttpRequestDispatcher(mockHttpConnector, mockRetryTemplate, mockServerSocket, mockWorkManager);
+        httpRequestDispatcher.disconnect();
+        verify(mockServerSocket, times(1)).close();
+    }
+
+    @Test
+    public void whenFailureCallSystemExceptionHandler() throws Exception
+    {
+        final HttpRequestDispatcher httpRequestDispatcher = new HttpRequestDispatcher(mockHttpConnector, mockRetryTemplate, mockServerSocket, mockWorkManager);
+        final Latch acceptCalledLath = new Latch();
+        sustituteLifecycleManager();
+        when(mockConnectorLifecycleManager.getState().isStarted()).thenReturn(true);
+        when(mockRetryTemplate.execute(any(RetryCallback.class), any(WorkManager.class))).thenAnswer(new Answer<Object>()
+        {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable
+            {
+                acceptCalledLath.release();
+                throw new Exception();
+            }
+        });
+        when(mockHttpConnector.getMuleContext().getExceptionListener()).thenReturn(mockExceptionListener);
+        Thread dispatcherThread = createDispatcherThread(httpRequestDispatcher);
+        try
+        {
+            dispatcherThread.start();
+            if (!acceptCalledLath.await(WAIT_TIME, TimeUnit.MILLISECONDS))
+            {
+                fail("retry template should be executed");
+            }
+            verify(mockExceptionListener, Mockito.atLeast(1)).handleException(Mockito.isA(Exception.class));
+        }
+        finally
+        {
+            httpRequestDispatcher.disconnect();
+        }
+    }
+
+    @Test
+    public void whenConnectorIsNotStartedDoNotAcceptSockets() throws Exception
+    {
+        HttpRequestDispatcher httpRequestDispatcher = new HttpRequestDispatcher(mockHttpConnector, mockRetryTemplate, mockServerSocket, mockWorkManager);
+        sustituteLifecycleManager();
+        when(mockConnectorLifecycleManager.getState().isStarted()).thenReturn(false);
+        when(mockHttpConnector.isStarted()).thenReturn(false);
+        Thread dispatcherThread = createDispatcherThread(httpRequestDispatcher);
+        try
+        {
+            dispatcherThread.start();
+            verify(mockWorkManager, times(0)).scheduleWork(any(HttpRequestDispatcherWork.class), anyLong(), any(ExecutionContext.class), any(WorkListener.class));
+        }
+        finally
+        {
+            httpRequestDispatcher.disconnect();
+        }
+    }
+
+
+    @Test
+    public void whenSocketAcceptedExecuteWork() throws Exception
+    {
+        final HttpRequestDispatcher httpRequestDispatcher = new HttpRequestDispatcher(mockHttpConnector, mockRetryTemplate, mockServerSocket, mockWorkManager);
+        final Latch acceptCalledLath = new Latch();
+        sustituteLifecycleManager();
+        when(mockConnectorLifecycleManager.getState().isStarted()).thenReturn(true);
+        when(mockRetryTemplate.execute(any(RetryCallback.class), any(WorkManager.class))).thenAnswer(new Answer<RetryContext>()
+        {
+            @Override
+            public RetryContext answer(InvocationOnMock invocationOnMock) throws Throwable
+            {
+                ((RetryCallback) invocationOnMock.getArguments()[0]).doWork(mockRetryContext);
+                return null;
+            }
+        });
+        Mockito.doAnswer(new Answer<Object>()
+        {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable
+            {
+                acceptCalledLath.release();
+                return null;
+            }
+        }).when(mockWorkManager).scheduleWork(any(HttpRequestDispatcherWork.class), anyLong(), any(ExecutionContext.class), any(WorkListener.class));
+        Thread dispatcherThread = createDispatcherThread(httpRequestDispatcher);
+        dispatcherThread.start();
+        try
+        {
+            if (!acceptCalledLath.await(500, TimeUnit.MILLISECONDS))
+            {
+                fail("Work should have been scheduled");
+            }
+        }
+        finally
+        {
+            httpRequestDispatcher.disconnect();
+        }
+
+    }
+
+    private void sustituteLifecycleManager() throws NoSuchFieldException, IllegalAccessException
+    {
+        Field filed = AbstractConnector.class.getDeclaredField("lifecycleManager");
+        filed.setAccessible(true);
+        filed.set(mockHttpConnector, mockConnectorLifecycleManager);
+    }
+
+    private Thread createDispatcherThread(final HttpRequestDispatcher httpRequestDispatcher)
+    {
+        Thread requestDispatcherThread = new Thread()
+        {
+            @Override
+            public void run()
+            {
+                httpRequestDispatcher.run();
+            }
+        };
+        requestDispatcherThread.setDaemon(true);
+        return requestDispatcherThread;
+    }
+}
Property changes on: branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpRequestDispatcherTestCase.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpRequestDispatcherWorkTestCase.java (0 => 25119)


--- branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpRequestDispatcherWorkTestCase.java	                        (rev 0)
+++ branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpRequestDispatcherWorkTestCase.java	2012-12-10 21:20:57 UTC (rev 25119)
@@ -0,0 +1,106 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.transport.http;
+
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.mule.tck.size.SmallTest;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+import javax.resource.spi.work.Work;
+
+import org.hamcrest.core.Is;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+@SmallTest
+public class HttpRequestDispatcherWorkTestCase
+{
+
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private HttpConnector mockHttpConnector;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private Socket mockSocket;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private InetSocketAddress mockInetSocketAddress;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private HttpMessageReceiver mockHttpMessageReceiver;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private Work mockWork;
+
+    @Test(expected = IllegalArgumentException.class)
+    public void createHttpRequestDispatcherWorkWithNullHttpConnector()
+    {
+        new HttpRequestDispatcherWork(null, mockSocket);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void createHttpRequestDispatcherWorkWithNullServerSocket()
+    {
+        new HttpRequestDispatcherWork(mockHttpConnector, null);
+    }
+
+    @Test
+    public void onExceptionCallSystemExceptionHandler() throws Exception
+    {
+        HttpRequestDispatcherWork httpRequestDispatcherWork = new HttpRequestDispatcherWork(mockHttpConnector, mockSocket);
+        setUpSocketMessage();
+        when(mockHttpConnector.lookupReceiver(any(Socket.class), any(HttpRequest.class))).thenThrow(Exception.class);
+        httpRequestDispatcherWork.run();
+        verify(mockHttpConnector.getMuleContext().getExceptionListener(), times(1)).handleException(any(Exception.class));
+    }
+
+    @Test
+    public void requestPathWithNoReceiver() throws Exception
+    {
+        HttpRequestDispatcherWork httpRequestDispatcherWork = new HttpRequestDispatcherWork(mockHttpConnector, mockSocket);
+        setUpSocketMessage();
+        when(mockSocket.getLocalSocketAddress()).thenReturn(mockInetSocketAddress);
+        when(mockHttpConnector.lookupReceiver(any(Socket.class), any(HttpRequest.class))).thenReturn(null);
+        ByteArrayOutputStream socketOutput = new ByteArrayOutputStream();
+        when(mockSocket.getOutputStream()).thenReturn(socketOutput);
+        httpRequestDispatcherWork.run();
+        String response = new String(socketOutput.toByteArray());
+        assertThat(response.startsWith("HTTP/1.0 404 Not Found"), Is.is(true));
+    }
+
+    @Test
+    public void onValidUriProcessRequest() throws Exception
+    {
+        HttpRequestDispatcherWork httpRequestDispatcherWork = new HttpRequestDispatcherWork(mockHttpConnector, mockSocket);
+        when(mockHttpConnector.lookupReceiver(isA(Socket.class), isA(HttpRequest.class))).thenReturn(mockHttpMessageReceiver);
+        setUpSocketMessage();
+        when(mockHttpMessageReceiver.createWork(isA(HttpServerConnection.class))).thenReturn(mockWork);
+        httpRequestDispatcherWork.run();
+        verify(mockWork, times(1)).run();
+
+    }
+
+    private void setUpSocketMessage() throws IOException
+    {
+        when(mockHttpConnector.getMuleContext().getConfiguration().getDefaultEncoding()).thenReturn("UTF-8");
+        when(mockSocket.getInputStream()).thenReturn(new ByteArrayInputStream("GET /path/to/file/index.html HTTP/1.0\n".getBytes()));
+    }
+
+}
Property changes on: branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpRequestDispatcherWorkTestCase.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpServerConnectionTestCase.java (0 => 25119)


--- branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpServerConnectionTestCase.java	                        (rev 0)
+++ branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpServerConnectionTestCase.java	2012-12-10 21:20:57 UTC (rev 25119)
@@ -0,0 +1,137 @@
+package org.mule.transport.http;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.mule.tck.junit4.AbstractMuleTestCase;
+import org.mule.tck.size.SmallTest;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.security.cert.Certificate;
+
+import javax.net.ssl.HandshakeCompletedEvent;
+import javax.net.ssl.HandshakeCompletedListener;
+import javax.net.ssl.SSLSocket;
+
+import org.hamcrest.core.IsNull;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+@RunWith(MockitoJUnitRunner.class)
+@SmallTest
+public class HttpServerConnectionTestCase extends AbstractMuleTestCase
+{
+
+    public static final String ENCODING = "UTF-8";
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private Socket mockSocket;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private SSLSocket mockSslSocket;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private HttpConnector mockHttpConnector;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private HandshakeCompletedEvent mockHandshakeCompleteEvent;
+    private Certificate[] mockLocalCertificate = new Certificate[2];
+    private Certificate[] mockPeerCertificates = new Certificate[2];
+
+    @Test
+    public void createHttpServerConnectionWithSslSocket() throws IOException
+    {
+        when(mockHandshakeCompleteEvent.getLocalCertificates()).thenReturn(mockLocalCertificate);
+        when(mockHandshakeCompleteEvent.getPeerCertificates()).thenReturn(mockPeerCertificates);
+        doAnswer(new Answer<Void>()
+        {
+            @Override
+            public Void answer(InvocationOnMock invocationOnMock) throws Throwable
+            {
+                ((HandshakeCompletedListener) invocationOnMock.getArguments()[0]).handshakeCompleted(mockHandshakeCompleteEvent);
+                return null;
+            }
+        }).when(mockSslSocket).addHandshakeCompletedListener(any(HandshakeCompletedListener.class));
+        HttpServerConnection httpServerConnection = new HttpServerConnection(mockSslSocket, ENCODING, mockHttpConnector);
+        verify(mockSslSocket, times(1)).addHandshakeCompletedListener(httpServerConnection);
+        assertThat(httpServerConnection.getLocalCertificateChain(), is(mockLocalCertificate));
+        assertThat(httpServerConnection.getPeerCertificateChain(), is(mockPeerCertificates));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void createHttpServerConnectionWithSocketAndFailForLocalCertificates() throws Exception
+    {
+        HttpServerConnection httpServerConnection = new HttpServerConnection(mockSocket, ENCODING, mockHttpConnector);
+        httpServerConnection.getLocalCertificateChain();
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void createHttpServerConnectionWithSocketAndFailForPeerCertificates() throws Exception
+    {
+        HttpServerConnection httpServerConnection = new HttpServerConnection(mockSocket, ENCODING, mockHttpConnector);
+        httpServerConnection.getPeerCertificateChain();
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void createHttpServerConnectionWithSocketAndFailForHandshakeLatch() throws Exception
+    {
+        HttpServerConnection httpServerConnection = new HttpServerConnection(mockSocket, ENCODING, mockHttpConnector);
+        httpServerConnection.getSslSocketHandshakeCompleteLatch();
+    }
+
+    @Test
+    public void resetConnectionReadNextRequest() throws Exception
+    {
+        when(mockSocket.getInputStream()).thenReturn(new ByteArrayInputStream(String.format("GET %s HTTP/1.1\n\nGET %s HTTP/1.1\n", "/service/order?param1=value1&param2=value2", "/?param1=value1&param2=value2").getBytes()));
+        HttpServerConnection httpServerConnection = new HttpServerConnection(mockSocket, ENCODING, mockHttpConnector);
+        assertThat(httpServerConnection.getUrlWithoutRequestParams(), is("/service/order"));
+        httpServerConnection.reset();
+        assertThat(httpServerConnection.getUrlWithoutRequestParams(), is("/"));
+    }
+
+    @Test
+    public void getRemoteSocketAddressWithNullSocketAddress() throws Exception
+    {
+        HttpServerConnection httpServerConnection = new HttpServerConnection(mockSocket, ENCODING, mockHttpConnector);
+        when(mockSocket.getRemoteSocketAddress()).thenReturn(null);
+        assertThat(httpServerConnection.getRemoteClientAddress(), IsNull.nullValue());
+    }
+
+    @Test
+    public void getRemoteSocketAddress() throws Exception
+    {
+        HttpServerConnection httpServerConnection = new HttpServerConnection(mockSocket, ENCODING, mockHttpConnector);
+        when(mockSocket.getRemoteSocketAddress()).thenReturn(new InetSocketAddress("host", 1000));
+        assertThat(httpServerConnection.getRemoteClientAddress(), is("host:1000"));
+    }
+
+    @Test
+    public void getUrlWithoutRequestParams() throws Exception
+    {
+        testUrlWithoutParams("/service/order?param1=value1&param2=value2", "/service/order");
+        testUrlWithoutParams("/service/order", "/service/order");
+        testUrlWithoutParams("/service?param1=value1&param2=value2", "/service");
+        testUrlWithoutParams("/?param1=value1&param2=value2", "/");
+        testUrlWithoutParams("/", "/");
+    }
+
+    private void testUrlWithoutParams(String requestUrl, String expectedUrlWithoutParams) throws IOException
+    {
+        when(mockSocket.getInputStream()).thenReturn(new ByteArrayInputStream(String.format("GET %s HTTP/1.0\n", requestUrl).getBytes()));
+        HttpServerConnection httpServerConnection = new HttpServerConnection(mockSocket, ENCODING, mockHttpConnector);
+        String urlWithoutParams = httpServerConnection.getUrlWithoutRequestParams();
+        assertThat(urlWithoutParams, is(expectedUrlWithoutParams));
+    }
+
+}
+
+
Property changes on: branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpServerConnectionTestCase.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Added: branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/RequestLineTestCase.java (0 => 25119)


--- branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/RequestLineTestCase.java	                        (rev 0)
+++ branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/RequestLineTestCase.java	2012-12-10 21:20:57 UTC (rev 25119)
@@ -0,0 +1,34 @@
+package org.mule.transport.http;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+import org.mule.tck.junit4.AbstractMuleTestCase;
+
+import org.apache.commons.httpclient.HttpVersion;
+import org.junit.Test;
+
+public class RequestLineTestCase extends AbstractMuleTestCase
+{
+
+    @Test
+    public void getWithoutParamsWithNoParams()
+    {
+        RequestLine requestLine = new RequestLine("GET", "/server/order", HttpVersion.HTTP_1_1);
+        assertThat(requestLine.getUrlWithoutParams(), is("/server/order"));
+    }
+
+    @Test
+    public void getWithoutParamsWithParams()
+    {
+        RequestLine requestLine = new RequestLine("GET", "/server/order?param1=value1", HttpVersion.HTTP_1_1);
+        assertThat(requestLine.getUrlWithoutParams(), is("/server/order"));
+    }
+
+    @Test
+    public void getWithoutParamsWithParamsInRootPath()
+    {
+        RequestLine requestLine = new RequestLine("GET", "/?param1=value1", HttpVersion.HTTP_1_1);
+        assertThat(requestLine.getUrlWithoutParams(), is("/"));
+    }
+}
Property changes on: branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/RequestLineTestCase.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style

Modified: branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/functional/HttpsHandshakeTimingTestCase.java (25118 => 25119)


--- branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/functional/HttpsHandshakeTimingTestCase.java	2012-12-10 19:56:53 UTC (rev 25118)
+++ branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/functional/HttpsHandshakeTimingTestCase.java	2012-12-10 21:20:57 UTC (rev 25119)
@@ -26,6 +26,8 @@
 import org.mule.api.service.Service;
 import org.mule.api.transport.Connector;
 import org.mule.tck.junit4.AbstractMuleContextTestCase;
+import org.mule.transport.http.HttpConnector;
+import org.mule.transport.http.HttpServerConnection;
 import org.mule.transport.http.HttpsConnector;
 import org.mule.transport.http.HttpsMessageReceiver;
 import org.mule.transport.ssl.MockHandshakeCompletedEvent;
@@ -34,7 +36,6 @@
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.net.Socket;
 import java.util.Collections;
 import java.util.Map;
 
@@ -54,13 +55,14 @@
  */
 public class HttpsHandshakeTimingTestCase extends AbstractMuleContextTestCase
 {
+
     @Test
     public void testHttpsHandshakeExceedsTimeout() throws Exception
     {
         MockHttpsMessageReceiver messageReceiver = setupMockHttpsMessageReceiver();
 
         MockSslSocket socket = new MockSslSocket();
-        Work work = messageReceiver.createWork(socket);
+        Work work = messageReceiver.createWork(new HttpServerConnection(socket, messageReceiver.getEndpoint().getEncoding(), (HttpConnector) messageReceiver.getConnector()));
         assertNotNull(work);
 
         MuleMessage message = new DefaultMuleMessage(TEST_MESSAGE, muleContext);
@@ -84,23 +86,21 @@
         MockHttpsMessageReceiver messageReceiver = setupMockHttpsMessageReceiver();
 
         MockSslSocket socket = new MockSslSocket();
-        Work work = messageReceiver.createWork(socket);
+        HttpServerConnection serverConnection = new HttpServerConnection(socket, messageReceiver.getEndpoint().getEncoding(), (HttpConnector) messageReceiver.getConnector());
+        Work work = messageReceiver.createWork(serverConnection);
         assertNotNull(work);
 
-        invokeHandshakeCompleted(work, socket);
+        invokeHandshakeCompleted(serverConnection, socket);
 
         MuleMessage message = new DefaultMuleMessage(TEST_MESSAGE, muleContext);
         invokePreRouteMessage(work, message);
         assertNotNull(message.<Object>getInboundProperty(MuleProperties.MULE_REMOTE_CLIENT_ADDRESS));
     }
 
-    private void invokeHandshakeCompleted(Work work, MockSslSocket socket) throws Exception
+    private void invokeHandshakeCompleted(HttpServerConnection serverConnection, MockSslSocket socket) throws Exception
     {
-        Method handshakeCompleted = work.getClass().getDeclaredMethod("handshakeCompleted", HandshakeCompletedEvent.class);
-        assertNotNull(handshakeCompleted);
-        handshakeCompleted.setAccessible(true);
         HandshakeCompletedEvent event = new MockHandshakeCompletedEvent(socket);
-        handshakeCompleted.invoke(work, new Object[] { event });
+        serverConnection.handshakeCompleted(event);
     }
 
     private void invokePreRouteMessage(Work work, MuleMessage message) throws Exception
@@ -108,7 +108,7 @@
         Method preRouteMessage = work.getClass().getDeclaredMethod("preRouteMessage", MuleMessage.class);
         assertNotNull(preRouteMessage);
         preRouteMessage.setAccessible(true);
-        preRouteMessage.invoke(work, new Object[] { message });
+        preRouteMessage.invoke(work, new Object[] {message});
     }
 
     private MockHttpsMessageReceiver setupMockHttpsMessageReceiver() throws CreateException
@@ -128,8 +128,9 @@
 
     private static class MockHttpsMessageReceiver extends HttpsMessageReceiver
     {
+
         public MockHttpsMessageReceiver(Connector connector, FlowConstruct flowConstruct,
-            InboundEndpoint endpoint) throws CreateException
+                                        InboundEndpoint endpoint) throws CreateException
         {
             super(connector, flowConstruct, endpoint);
         }
@@ -138,9 +139,9 @@
          * Open up access for unit test
          */
         @Override
-        public Work createWork(Socket socket) throws IOException
+        public Work createWork(HttpServerConnection httpServerConnection) throws IOException
         {
-            return super.createWork(socket);
+            return super.createWork(httpServerConnection);
         }
     }
 }

To unsubscribe from this list please visit:

http://xircles.codehaus.org/manage_email