Grizzly http client race condition

classic Classic list List threaded Threaded
18 messages Options
Reply | Threaded
Open this post in threaded view
|

Grizzly http client race condition

testn
I'm not sure whether anyone has encountered this before. Basically, there seems to be a race condition in FeedableBodyGenerator in grizzly-http-client 1.8 that it uploaded a corrupted data. When I use this code to upload the data, the uploaded data will not be the same as the input file. However, if I use setBody(InputStream) method instead, it works without any problem. This seems to be caused by a race condition in the main thread and the grizzly IO thread. That seems that Feeder overwrites the previous chunk before it has a chance to write that down to the socket

import com.ning.http.client.*;
import com.ning.http.client.providers.grizzly.FeedableBodyGenerator;
import com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider;
import com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProviderConfig;
import com.ning.http.client.providers.grizzly.TransportCustomizer;
import org.glassfish.grizzly.filterchain.FilterChainBuilder;
import org.glassfish.grizzly.memory.Buffers;
import org.glassfish.grizzly.memory.MemoryManager;
import org.glassfish.grizzly.nio.transport.TCPNIOTransport;

import java.io.FileInputStream;
import java.io.FilterInputStream;
import java.io.IOException;

public class Main {
    public static void main(String[] args) throws Exception {
        GrizzlyAsyncHttpProviderConfig httpConfig = new GrizzlyAsyncHttpProviderConfig();
        httpConfig.addProperty(GrizzlyAsyncHttpProviderConfig.Property.TRANSPORT_CUSTOMIZER, new TransportCustomizer() {
            public void customize(TCPNIOTransport tcpnioTransport, FilterChainBuilder filterChainBuilder) {
                tcpnioTransport.setOptimizedForMultiplexing(true);
            }
        });
        AsyncHttpClientConfig.Builder builder = new AsyncHttpClientConfig.Builder();
        builder.setIOThreadMultiplier(1);
//        builder.setProxyServer(new ProxyServer("127.0.0.1", 8080));
        builder.setAsyncHttpClientProviderConfig(httpConfig);
        AsyncHttpClientConfig config = builder.build();
        GrizzlyAsyncHttpProvider provider = new GrizzlyAsyncHttpProvider(config);
        AsyncHttpClient client = new AsyncHttpClient(provider);
        AsyncHttpClient.BoundRequestBuilder requestBuilder = client.preparePut("http://192.168.8.80:8080/v1/AUTH_test/test/test");

        final FeedableBodyGenerator bodyGenerator = new FeedableBodyGenerator();
        final int chunkSize = 8192;
        bodyGenerator.setMaxPendingBytes(chunkSize);
        final FeedableBodyGenerator.Feeder feeder = new FeedableBodyGenerator.SimpleFeeder(bodyGenerator) {
            @Override
            public void flush() throws IOException {
                try (FileInputStream s = new FileInputStream("/tmp/largefile")) {
                    byte[] buf = new byte[chunkSize];
                    while (s.read(buf) > 0) {
                        feed(Buffers.wrap(MemoryManager.DEFAULT_MEMORY_MANAGER, buf), false);
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    feed(Buffers.wrap(MemoryManager.DEFAULT_MEMORY_MANAGER, buf, 0, 0), true);
                }
            }
        };
        bodyGenerator.setFeeder(feeder);
        requestBuilder.setBody(bodyGenerator);
//        FileInputStream stream = new FileInputStream("/usr/share/oracle/parallel-0ddea0/data/1432618799130-8");
//        FilterInputStream wrapper = new FilterInputStream(stream) {
//            @Override
//            public int read() throws IOException {
//                return super.read();
//            }
//
//            @Override
//            public int read(byte[] b) throws IOException {
//                return super.read(b);
//            }
//
//            @Override
//            public int read(byte[] b, int off, int len) throws IOException {
//                return super.read(b, off, len);
//            }
//        };
//        requestBuilder.setBody(wrapper);
        Request request = requestBuilder.build();
        ListenableFuture<Response> future = client.executeRequest(request);
        Response response = future.get();
        System.out.println(response.getHeader("Etag"));
        client.close();
    }
}
Reply | Threaded
Open this post in threaded view
|

Re: Grizzly http client race condition

testn
Note that to reproduce the problem, remove the following line:

                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
Reply | Threaded
Open this post in threaded view
|

Re: Grizzly http client race condition

testn
This bug seems to be caused by a race condition between main thread and kernel worker thread
1. Feeder keeps feeding the data from the main thread and buffer is reused by Feeder
2. Buffers.wrap(MemoryManager.DEFAULT_MEMORY_MANAGER, buffer) seems to wrap around buffer byte array rather than creating a new Buffer
3. AsyncWriteQueueRecord is created in TCPNIOAsyncQueueWriter. AsyncWriteQueueRecord holds a reference to message where it holds the Buffer. If the write queue is not full and the record gets processed by the caller thread immediately, everything works correctly.
4. If the TCP socket is full or TCPNIOTransport.setOptimizedForMultiplexing(true), and the write is enqueued to be processed when the socket buffer is not full
5. When the socket can receive more data, it will trigger a selector to execute in a kernel thread to process the remaining message. However, since the buffer is getting reused, the message to be processed may have been overwritten by Feeder in the main thread.
Reply | Threaded
Open this post in threaded view
|

Re: Grizzly http client race condition

oleksiys
Administrator
In reply to this post by testn
Hi,

it's expected behavior, Grizzly Buffer, which is backed by a byte[]
could be written asynchronously, that's why the payload could be broken
(just saw your analysis and it's 100% correct).
Why don't you use simple requestBuilder.setFile(File)?

WBR,
Alexey.

On 26.05.15 19:57, testn wrote:

> I'm not sure whether anyone has encountered this before. Basically, there
> seems to be a race condition in FeedableBodyGenerator in grizzly-http-client
> 1.8 that it uploaded a corrupted data. When I use this code to upload the
> data, the uploaded data will not be the same as the input file. However, if
> I use setBody(InputStream) method instead, it works without any problem.
> This seems to be caused by a race condition in the main thread and the
> grizzly IO thread. That seems that Feeder overwrites the previous chunk
> before it has a chance to write that down to the socket
>
> import com.ning.http.client.*;
> import com.ning.http.client.providers.grizzly.FeedableBodyGenerator;
> import com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider;
> import
> com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProviderConfig;
> import com.ning.http.client.providers.grizzly.TransportCustomizer;
> import org.glassfish.grizzly.filterchain.FilterChainBuilder;
> import org.glassfish.grizzly.memory.Buffers;
> import org.glassfish.grizzly.memory.MemoryManager;
> import org.glassfish.grizzly.nio.transport.TCPNIOTransport;
>
> import java.io.FileInputStream;
> import java.io.FilterInputStream;
> import java.io.IOException;
>
> public class Main {
>      public static void main(String[] args) throws Exception {
>          GrizzlyAsyncHttpProviderConfig httpConfig = new
> GrizzlyAsyncHttpProviderConfig();
>        
> httpConfig.addProperty(GrizzlyAsyncHttpProviderConfig.Property.TRANSPORT_CUSTOMIZER,
> new TransportCustomizer() {
>              public void customize(TCPNIOTransport tcpnioTransport,
> FilterChainBuilder filterChainBuilder) {
>                  tcpnioTransport.setOptimizedForMultiplexing(true);
>              }
>          });
>          AsyncHttpClientConfig.Builder builder = new
> AsyncHttpClientConfig.Builder();
>          builder.setIOThreadMultiplier(1);
> //        builder.setProxyServer(new ProxyServer("127.0.0.1", 8080));
>          builder.setAsyncHttpClientProviderConfig(httpConfig);
>          AsyncHttpClientConfig config = builder.build();
>          GrizzlyAsyncHttpProvider provider = new
> GrizzlyAsyncHttpProvider(config);
>          AsyncHttpClient client = new AsyncHttpClient(provider);
>          AsyncHttpClient.BoundRequestBuilder requestBuilder =
> client.preparePut("http://192.168.8.80:8080/v1/AUTH_test/test/test");
>
>          final FeedableBodyGenerator bodyGenerator = new
> FeedableBodyGenerator();
>          final int chunkSize = 8192;
>          bodyGenerator.setMaxPendingBytes(chunkSize);
>          final FeedableBodyGenerator.Feeder feeder = new
> FeedableBodyGenerator.SimpleFeeder(bodyGenerator) {
>              @Override
>              public void flush() throws IOException {
>                  try (FileInputStream s = new
> FileInputStream("/tmp/largefile")) {
>                      byte[] buf = new byte[chunkSize];
>                      while (s.read(buf) > 0) {
>                        
> feed(Buffers.wrap(MemoryManager.DEFAULT_MEMORY_MANAGER, buf), false);
>                          try {
>                              Thread.sleep(1000);
>                          } catch (InterruptedException e) {
>                              e.printStackTrace();
>                          }
>                      }
>                      feed(Buffers.wrap(MemoryManager.DEFAULT_MEMORY_MANAGER,
> buf, 0, 0), true);
>                  }
>              }
>          };
>          bodyGenerator.setFeeder(feeder);
>          requestBuilder.setBody(bodyGenerator);
> //        FileInputStream stream = new
> FileInputStream("/usr/share/oracle/parallel-0ddea0/data/1432618799130-8");
> //        FilterInputStream wrapper = new FilterInputStream(stream) {
> //            @Override
> //            public int read() throws IOException {
> //                return super.read();
> //            }
> //
> //            @Override
> //            public int read(byte[] b) throws IOException {
> //                return super.read(b);
> //            }
> //
> //            @Override
> //            public int read(byte[] b, int off, int len) throws IOException
> {
> //                return super.read(b, off, len);
> //            }
> //        };
> //        requestBuilder.setBody(wrapper);
>          Request request = requestBuilder.build();
>          ListenableFuture<Response> future = client.executeRequest(request);
>          Response response = future.get();
>          System.out.println(response.getHeader("Etag"));
>          client.close();
>      }
> }
>
>
>
>
> --
> View this message in context: http://grizzly.1045725.n5.nabble.com/Grizzly-http-client-race-condition-tp5710847.html
> Sent from the Grizzly - Users mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Grizzly http client race condition

testn
That's how grizzly-http-client was written to use FeedableBodyGenerator even though it seems unnecessary to convert an InputStream into an OutputStream to feed it down to Grizzly again where Grizzly could have read the data directly from InputStream in worker thread.
Reply | Threaded
Open this post in threaded view
|

Re: Grizzly http client race condition

testn
Currently, I tried to work around by setting configureBlocking(true). However, it seems to introduce another issue where initialAsynchronousTransfer() seems to be called multiple times by different threads causing the following stack trace

Caused by: java.lang.IllegalStateException: Async transfer has already been initiated.
        at com.ning.http.client.providers.grizzly.FeedableBodyGenerator.initializeAsynchronousTransfer(FeedableBodyGenerator.java:156) ~[grizzly-http-client-1.8.15.jar:na]
        at com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider$BodyGeneratorBodyHandler.doHandle(GrizzlyAsyncHttpProvider.java:2419) ~[grizzly-http-client-1.8.15.jar:na]
        at com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider.sendRequest(GrizzlyAsyncHttpProvider.java:580) ~[grizzly-http-client-1.8.15.jar:na]
        at com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider$AsyncHttpClientFilter.sendAsGrizzlyRequest(GrizzlyAsyncHttpProvider.java:968) ~[grizzly-http-client-1.8.15.jar:na]
        at com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider$AsyncHttpClientFilter.handleWrite(GrizzlyAsyncHttpProvider.java:833) ~[grizzly-http-client-1.8.15.jar:na]
        at org.glassfish.grizzly.filterchain.ExecutorResolver$8.execute(ExecutorResolver.java:111) ~[grizzly-framework-2.3.16.jar:2.3.16]
        at org.glassfish.grizzly.filterchain.DefaultFilterChain.executeFilter(DefaultFilterChain.java:284) ~[grizzly-framework-2.3.16.jar:2.3.16]
        at org.glassfish.grizzly.filterchain.DefaultFilterChain.executeChainPart(DefaultFilterChain.java:201) ~[grizzly-framework-2.3.16.jar:2.3.16]
        at org.glassfish.grizzly.filterchain.DefaultFilterChain.execute(DefaultFilterChain.java:133) ~[grizzly-framework-2.3.16.jar:2.3.16]
        at org.glassfish.grizzly.filterchain.DefaultFilterChain.process(DefaultFilterChain.java:112) ~[grizzly-framework-2.3.16.jar:2.3.16]
Reply | Threaded
Open this post in threaded view
|

Re: Grizzly http client race condition

oleksiys
Administrator
Hi,

can you pls. share the testcase and provide more info on how you
expected it to work?

Thanks.

WBR,
Alexey.


On 28.05.15 19:44, testn wrote:

> Currently, I tried to work around by setting configureBlocking(true).
> However, it seems to introduce another issue where
> initialAsynchronousTransfer() seems to be called multiple times by different
> threads causing the following stack trace
>
> Caused by: java.lang.IllegalStateException: Async transfer has already been
> initiated.
> at
> com.ning.http.client.providers.grizzly.FeedableBodyGenerator.initializeAsynchronousTransfer(FeedableBodyGenerator.java:156)
> ~[grizzly-http-client-1.8.15.jar:na]
> at
> com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider$BodyGeneratorBodyHandler.doHandle(GrizzlyAsyncHttpProvider.java:2419)
> ~[grizzly-http-client-1.8.15.jar:na]
> at
> com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider.sendRequest(GrizzlyAsyncHttpProvider.java:580)
> ~[grizzly-http-client-1.8.15.jar:na]
> at
> com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider$AsyncHttpClientFilter.sendAsGrizzlyRequest(GrizzlyAsyncHttpProvider.java:968)
> ~[grizzly-http-client-1.8.15.jar:na]
> at
> com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider$AsyncHttpClientFilter.handleWrite(GrizzlyAsyncHttpProvider.java:833)
> ~[grizzly-http-client-1.8.15.jar:na]
> at
> org.glassfish.grizzly.filterchain.ExecutorResolver$8.execute(ExecutorResolver.java:111)
> ~[grizzly-framework-2.3.16.jar:2.3.16]
> at
> org.glassfish.grizzly.filterchain.DefaultFilterChain.executeFilter(DefaultFilterChain.java:284)
> ~[grizzly-framework-2.3.16.jar:2.3.16]
> at
> org.glassfish.grizzly.filterchain.DefaultFilterChain.executeChainPart(DefaultFilterChain.java:201)
> ~[grizzly-framework-2.3.16.jar:2.3.16]
> at
> org.glassfish.grizzly.filterchain.DefaultFilterChain.execute(DefaultFilterChain.java:133)
> ~[grizzly-framework-2.3.16.jar:2.3.16]
> at
> org.glassfish.grizzly.filterchain.DefaultFilterChain.process(DefaultFilterChain.java:112)
> ~[grizzly-framework-2.3.16.jar:2.3.16]
>
>
>
>
> --
> View this message in context: http://grizzly.1045725.n5.nabble.com/Grizzly-http-client-race-condition-tp5710847p5710861.html
> Sent from the Grizzly - Users mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Grizzly http client race condition

testn
I haven't been able to create a small testcase yet but it is pretty much reproducible every time. It looks like sendAsGrizzlyRequest is called multiple times. It seems to happen much more often if the content length from from FeedableBodyGenerator is 0
Reply | Threaded
Open this post in threaded view
|

Re: Grizzly http client race condition

oleksiys
Administrator
Unfortunately it's difficult to say what's wrong without a testcase :(

WBR,
Alexey.


On 29.05.15 08:37, testn wrote:

> I haven't been able to create a small testcase yet but it is pretty much
> reproducible every time. It looks like sendAsGrizzlyRequest is called
> multiple times. It seems to happen much more often if the content length
> from from FeedableBodyGenerator is 0
>
>
>
> --
> View this message in context: http://grizzly.1045725.n5.nabble.com/Grizzly-http-client-race-condition-tp5710847p5710864.html
> Sent from the Grizzly - Users mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Grizzly http client race condition

testn
I think I have a case where it happens.. it looks like it seems to occur when a PUT is requested but the server returns with 302 to redirect the request somewhere else. I haven't got a chance to generate the test case yet.
Reply | Threaded
Open this post in threaded view
|

Re: Grizzly http client race condition

testn
Ok. I tested it manually and I confirmed that Grizzly will try to feed the body again after the redirect thus the exception. The correct behavior is probably to perform a GET without the body or fail it?(http://serverfault.com/questions/391181/examples-of-302-vs-303)
Reply | Threaded
Open this post in threaded view
|

Re: Grizzly http client race condition

testn
Alexey,

Actually I just got a chance to run a write intensive workload. I found that configureBlocking(true) did help somewhat but the uploaded file is still corrupted. I'm still trying to nail down why that happens.
Reply | Threaded
Open this post in threaded view
|

Re: Grizzly http client race condition

testn
Alexey,

Since the existing code in Jersey/AHC uses FeedableBodyGenerator in a non-thread-safe way. What is your recommendation to address the problem?
Reply | Threaded
Open this post in threaded view
|

Re: Grizzly http client race condition

oleksiys
Administrator
Hmm, it supposed to be thread-safe if used properly.
Now it's either bug or just improper use.
I agree with your observation regarding 302 processing and that the
payload shouldn't be re-sent (it's something I have to take a look at),
but not sure I remember the testcase, that confirms, that
FeedableBodyGenerator is not thread-safe.

Thanks.

WBR,
Alexey.

On 05.06.15 15:47, testn wrote:

> Alexey,
>
> Since the existing code in Jersey/AHC uses FeedableBodyGenerator in a
> non-thread-safe way. What is your recommendation to address the problem?
>
>
>
> --
> View this message in context: http://grizzly.1045725.n5.nabble.com/Grizzly-http-client-race-condition-tp5710847p5710875.html
> Sent from the Grizzly - Users mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Grizzly http client race condition

testn
the part which is not thread-safe is in GrizzlyConnector.FeedAdapter where Buffers.wrap(MemoryManager.DEFAULT_MEMORY_MANAGER, b, off, len) is passed to AsyncWriteQueueRecord without copying it. Do you think AbstractNIOAsyncQueueWriter.write should copy the record when it needs to enqueue the record to the queue when it cannot write inline?
Reply | Threaded
Open this post in threaded view
|

Re: Grizzly http client race condition

oleksiys
Administrator
Hi,

On 05.06.15 22:40, testn wrote:
> the part which is not thread-safe is in GrizzlyConnector.FeedAdapter where
> Buffers.wrap(MemoryManager.DEFAULT_MEMORY_MANAGER, b, off, len) is passed to
> AsyncWriteQueueRecord without copying it. Do you think
> AbstractNIOAsyncQueueWriter.write should copy the record when it needs to
> enqueue the record to the queue when it cannot write inline?
Hmm, I don't think async write queue should copy it, because there are
cases where it's better to copy the buffer to simplify some usecases
like the one we're talking about, but sometimes the buffer is some
internal buffer, which doesn't have to be copied, because the sender
doesn't plan to reuse it.
Considering above, we don't copy the buffer and rely on sender logic to
take care of this situation.

Please share your Feeder code and I'll try to help you.

Thanks.

WBR,
Alexey.
Reply | Threaded
Open this post in threaded view
|

Re: Grizzly http client race condition

testn
The feeder code is from grizzly-http-client (aka async-http-client 1.8.15) and jersey-grizzly-connector

    /**
     * Utility OutputStream implementation that can feed Grizzly chunk-encoded body generator.
     */
    private class FeederAdapter extends OutputStream {

        final FeedableBodyGenerator.Feeder delegate;

        /**
         * Get me a new adapter for given feeder.
         *
         * @param bodyFeeder adaptee to get fed as an output stream.
         */
        FeederAdapter(FeedableBodyGenerator.Feeder bodyFeeder) {
            this.delegate = bodyFeeder;
        }

        @Override
        public void write(int b) throws IOException {
            final byte[] buffer = new byte[1];
            buffer[0] = (byte) b;
            delegate.feed(Buffers.wrap(MemoryManager.DEFAULT_MEMORY_MANAGER, buffer), false);
        }

        @Override
        public void write(byte[] b) throws IOException {
            delegate.feed(Buffers.wrap(MemoryManager.DEFAULT_MEMORY_MANAGER, b), false);
        }

        @Override
        public void write(byte[] b, int off, int len) throws IOException {
            delegate.feed(Buffers.wrap(MemoryManager.DEFAULT_MEMORY_MANAGER, b, off, len), false);
        }

        @Override
        public void close() throws IOException {
            delegate.feed(Buffers.EMPTY_BUFFER, true);
        }
    }

Reply | Threaded
Open this post in threaded view
|

Re: Grizzly http client race condition

oleksiys
Administrator
I see what you mean, it that case it's a bug in jersey connector, and it
has to be fixed.

I'll take care of it.

WBR,
Alexey.


On 11.06.15 20:50, testn wrote:

> The feeder code is from grizzly-http-client (aka async-http-client 1.8.15)
> and jersey-grizzly-connector
>
>      /**
>       * Utility OutputStream implementation that can feed Grizzly
> chunk-encoded body generator.
>       */
>      private class FeederAdapter extends OutputStream {
>
>          final FeedableBodyGenerator.Feeder delegate;
>
>          /**
>           * Get me a new adapter for given feeder.
>           *
>           * @param bodyFeeder adaptee to get fed as an output stream.
>           */
>          FeederAdapter(FeedableBodyGenerator.Feeder bodyFeeder) {
>              this.delegate = bodyFeeder;
>          }
>
>          @Override
>          public void write(int b) throws IOException {
>              final byte[] buffer = new byte[1];
>              buffer[0] = (byte) b;
>              delegate.feed(Buffers.wrap(MemoryManager.DEFAULT_MEMORY_MANAGER,
> buffer), false);
>          }
>
>          @Override
>          public void write(byte[] b) throws IOException {
>              delegate.feed(Buffers.wrap(MemoryManager.DEFAULT_MEMORY_MANAGER,
> b), false);
>          }
>
>          @Override
>          public void write(byte[] b, int off, int len) throws IOException {
>              delegate.feed(Buffers.wrap(MemoryManager.DEFAULT_MEMORY_MANAGER,
> b, off, len), false);
>          }
>
>          @Override
>          public void close() throws IOException {
>              delegate.feed(Buffers.EMPTY_BUFFER, true);
>          }
>      }
>
>
>
>
>
> --
> View this message in context: http://grizzly.1045725.n5.nabble.com/Grizzly-http-client-race-condition-tp5710847p5710901.html
> Sent from the Grizzly - Users mailing list archive at Nabble.com.