Read doesn't work

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Read doesn't work

Евгений Бушуев
Hello, 


Need your help for my problem. I'm trying to write json <-> db service with custom 'nio' connection to my database (no jdbc driver).  In short the code so far looks lie:

public static void main(String[] args) throws IOException {

TCPNIOTransport tcpTransport = TCPNIOTransportBuilder.newInstance().build();
tcpTransport.start();


final FilterChain clientFilterChain = FilterChainBuilder.stateless()
.add(new TransportFilter())
.build();

TCPNIOConnectorHandler connectorHandler = TCPNIOConnectorHandler.builder(tcpTransport)
.processor(clientFilterChain)
.build();


SingleEndpointPool singleEndpointPool = SingleEndpointPool //database connections
.builder(SocketAddress.class)
.connectorHandler(connectorHandler)
.endpointAddress(new InetSocketAddress(InetAddress.getByName("localhost"), 12345))
.maxPoolSize(10)
.build();

HttpServer server = HttpServer.createSimpleServer(null, "localhost", 9090);
server.getServerConfiguration().addHttpHandler(new MyHandler(singleEndpointPool), "/users");

server.start();

System.in.read();
}
then MyHandler (from org.glassfish.grizzly.samples.httpserver.nonblockinghandler.UploadHttpHandlerSample):
...
public void service(Request request, org.glassfish.grizzly.http.server.Response response) throws Exception {
   final NIOInputStream in = request.getNIOInputStream();
   response.suspend();
   in.notifyAvailable(new ReadHandler() {
................
        @Override
public void onAllDataRead() throws Exception {
             // getting connection to db            
               GrizzlyFuture<Connection> connectionFuture = singleEndpointPool.take();
               Connection connection = connectionFuture.get(10, TimeUnit.MILLISECONDS);
            //didn't find any better place to set timeouts, looks like no way to do it through pool?
               connection.setReadTimeout(1000, TimeUnit.MILLISECONDS);
connection.setWriteTimeout(1000, TimeUnit.MILLISECONDS);
              //here code to make dbRq from http request
              .......................
               connection.write(new ByteBufferWrapper(dbRq), new EmptyCompletionHandler<RecordWriteResult>() {
                  .....
                  @Override
                  public void completed(RecordWriteResult result) {
                        connection.read(new CompletionHandler<ReadResult>() {
                               HERE IS THE PROBLEM
                               Neither completed  no failed are colled
                        });     
                  }
               });
        }
   });
}
The first question - is this code looks like optimal way to write json/db service?
And then, if it is, why doesn't it receive anything from db? It writes, I can see records inserted, and I'm sure db wants to return something, at least "pure nio" implementation receives db responses. 

Best regards, Eugene.

Reply | Threaded
Open this post in threaded view
|

Re: Read doesn't work

oleksiys
Administrator
Hi,

in general the idea is correct, the only concern I have is that you try to make
HttpHandler I/O non-blocking, but at the same time the client-side code inside HttpHandler is blocking.

I can suggest following changes:

1) Retrieve client connection from pool using CompletionHandler:

            pool.take(new EmptyCompletionHandler<Connection>() {

                @Override
                public void completed(final Connection connection) {
                    ....
                }
               
            });

2) Connection.read(), as you probably noticed doesn't work (it works only in a specific case), I'd suggest to rework this part to be non-blocking as well
The solution could be similar to what is described in the cookbook [1].

I'd also suggest to implement a client-side DbProtocolFilter, which will be able to decode DB responses and pass it to the next Filter, only when entire response is received.
But I can help here only if I know the underlying DB protocol.

Thanks.

WBR,
Alexey.

[1] https://grizzly.java.net/cookbook.html

On 21.11.14 11:28, Евгений Бушуев wrote:
Hello, 


Need your help for my problem. I'm trying to write json <-> db service with custom 'nio' connection to my database (no jdbc driver).  In short the code so far looks lie:

public static void main(String[] args) throws IOException {

    TCPNIOTransport tcpTransport = TCPNIOTransportBuilder.newInstance().build();
    tcpTransport.start();


    final FilterChain clientFilterChain = FilterChainBuilder.stateless()
            .add(new TransportFilter())
            .build();

    TCPNIOConnectorHandler connectorHandler = TCPNIOConnectorHandler.builder(tcpTransport)
            .processor(clientFilterChain)
            .build();


    SingleEndpointPool singleEndpointPool = SingleEndpointPool //database connections
            .builder(SocketAddress.class)
            .connectorHandler(connectorHandler)
            .endpointAddress(new InetSocketAddress(InetAddress.getByName("localhost"), 12345))
            .maxPoolSize(10)
            .build();

    HttpServer server = HttpServer.createSimpleServer(null, "localhost", 9090);
    server.getServerConfiguration().addHttpHandler(new MyHandler(singleEndpointPool), "/users");

   server.start();

   System.in.read();
}
then MyHandler (from org.glassfish.grizzly.samples.httpserver.nonblockinghandler.UploadHttpHandlerSample):
...
public void service(Request request, org.glassfish.grizzly.http.server.Response response) throws Exception {
   final NIOInputStream in = request.getNIOInputStream();
   response.suspend();
   in.notifyAvailable(new ReadHandler() {
................
        @Override
        public void onAllDataRead() throws Exception {
             // getting connection to db            
               GrizzlyFuture<Connection> connectionFuture = singleEndpointPool.take();
               Connection connection = connectionFuture.get(10, TimeUnit.MILLISECONDS);
            //didn't find any better place to set timeouts, looks like no way to do it through pool?
               connection.setReadTimeout(1000, TimeUnit.MILLISECONDS);
               connection.setWriteTimeout(1000, TimeUnit.MILLISECONDS);
              //here code to make dbRq from http request
              .......................
               connection.write(new ByteBufferWrapper(dbRq), new EmptyCompletionHandler<RecordWriteResult>() {
                  .....
                  @Override
                  public void completed(RecordWriteResult result) {
                        connection.read(new CompletionHandler<ReadResult>() {
                               HERE IS THE PROBLEM
                               Neither completed  no failed are colled
                        });     
                  }
               });
        }
   });
}
The first question - is this code looks like optimal way to write json/db service?
And then, if it is, why doesn't it receive anything from db? It writes, I can see records inserted, and I'm sure db wants to return something, at least "pure nio" implementation receives db responses. 

Best regards, Eugene.

        

Reply | Threaded
Open this post in threaded view
|

Re: Read doesn't work

Евгений Бушуев
Hi, 

Thanks for helping! But I still can't get it working. I've changed all read/write calls to use async i/o as you suggested and added filter. So now my code look like this:




class JsonHttpHandler extends HttpHandler {
  public void service(Request request, Response response) throws Exception {
    final NIOInputStream in = request.getNIOInputStream();
    in.notifyAvailable(new ReadHandler() {
        singleEndpointPool.take(
              new EmptyCompletionHandler<Connection>() {
        );
    });
  } 
}

2014-11-21 22:11 GMT+02:00 Oleksiy Stashok <[hidden email]>:
Hi,

in general the idea is correct, the only concern I have is that you try to make
HttpHandler I/O non-blocking, but at the same time the client-side code inside HttpHandler is blocking.

I can suggest following changes:

1) Retrieve client connection from pool using CompletionHandler:

            pool.take(new EmptyCompletionHandler<Connection>() {

                @Override
                public void completed(final Connection connection) {
                    ....
                }
               
            });

2) Connection.read(), as you probably noticed doesn't work (it works only in a specific case), I'd suggest to rework this part to be non-blocking as well
The solution could be similar to what is described in the cookbook [1].

I'd also suggest to implement a client-side DbProtocolFilter, which will be able to decode DB responses and pass it to the next Filter, only when entire response is received.
But I can help here only if I know the underlying DB protocol.

Thanks.

WBR,
Alexey.

[1] https://grizzly.java.net/cookbook.html


On 21.11.14 11:28, Евгений Бушуев wrote:
Hello, 


Need your help for my problem. I'm trying to write json <-> db service with custom 'nio' connection to my database (no jdbc driver).  In short the code so far looks lie:

public static void main(String[] args) throws IOException {

    TCPNIOTransport tcpTransport = TCPNIOTransportBuilder.newInstance().build();
    tcpTransport.start();


    final FilterChain clientFilterChain = FilterChainBuilder.stateless()
            .add(new TransportFilter())
            .build();

    TCPNIOConnectorHandler connectorHandler = TCPNIOConnectorHandler.builder(tcpTransport)
            .processor(clientFilterChain)
            .build();


    SingleEndpointPool singleEndpointPool = SingleEndpointPool //database connections
            .builder(SocketAddress.class)
            .connectorHandler(connectorHandler)
            .endpointAddress(new InetSocketAddress(InetAddress.getByName("localhost"), 12345))
            .maxPoolSize(10)
            .build();

    HttpServer server = HttpServer.createSimpleServer(null, "localhost", 9090);
    server.getServerConfiguration().addHttpHandler(new MyHandler(singleEndpointPool), "/users");

   server.start();

   System.in.read();
}
then MyHandler (from org.glassfish.grizzly.samples.httpserver.nonblockinghandler.UploadHttpHandlerSample):
...
public void service(Request request, org.glassfish.grizzly.http.server.Response response) throws Exception {
   final NIOInputStream in = request.getNIOInputStream();
   response.suspend();
   in.notifyAvailable(new ReadHandler() {
................
        @Override
        public void onAllDataRead() throws Exception {
             // getting connection to db            
               GrizzlyFuture<Connection> connectionFuture = singleEndpointPool.take();
               Connection connection = connectionFuture.get(10, TimeUnit.MILLISECONDS);
            //didn't find any better place to set timeouts, looks like no way to do it through pool?
               connection.setReadTimeout(1000, TimeUnit.MILLISECONDS);
               connection.setWriteTimeout(1000, TimeUnit.MILLISECONDS);
              //here code to make dbRq from http request
              .......................
               connection.write(new ByteBufferWrapper(dbRq), new EmptyCompletionHandler<RecordWriteResult>() {
                  .....
                  @Override
                  public void completed(RecordWriteResult result) {
                        connection.read(new CompletionHandler<ReadResult>() {
                               HERE IS THE PROBLEM
                               Neither completed  no failed are colled
                        });     
                  }
               });
        }
   });
}
The first question - is this code looks like optimal way to write json/db service?
And then, if it is, why doesn't it receive anything from db? It writes, I can see records inserted, and I'm sure db wants to return something, at least "pure nio" implementation receives db responses. 

Best regards, Eugene.

        


Reply | Threaded
Open this post in threaded view
|

Re: Read doesn't work

Евгений Бушуев
sorry, accidently sent it, can't get used to gmail...  to be continued.

2014-11-24 16:46 GMT+02:00 Евгений Бушуев <[hidden email]>:
Hi, 

Thanks for helping! But I still can't get it working. I've changed all read/write calls to use async i/o as you suggested and added filter. So now my code look like this:




class JsonHttpHandler extends HttpHandler {
  public void service(Request request, Response response) throws Exception {
    final NIOInputStream in = request.getNIOInputStream();
    in.notifyAvailable(new ReadHandler() {
        singleEndpointPool.take(
              new EmptyCompletionHandler<Connection>() {
        );
    });
  } 
}

2014-11-21 22:11 GMT+02:00 Oleksiy Stashok <[hidden email]>:
Hi,

in general the idea is correct, the only concern I have is that you try to make
HttpHandler I/O non-blocking, but at the same time the client-side code inside HttpHandler is blocking.

I can suggest following changes:

1) Retrieve client connection from pool using CompletionHandler:

            pool.take(new EmptyCompletionHandler<Connection>() {

                @Override
                public void completed(final Connection connection) {
                    ....
                }
               
            });

2) Connection.read(), as you probably noticed doesn't work (it works only in a specific case), I'd suggest to rework this part to be non-blocking as well
The solution could be similar to what is described in the cookbook [1].

I'd also suggest to implement a client-side DbProtocolFilter, which will be able to decode DB responses and pass it to the next Filter, only when entire response is received.
But I can help here only if I know the underlying DB protocol.

Thanks.

WBR,
Alexey.

[1] https://grizzly.java.net/cookbook.html


On 21.11.14 11:28, Евгений Бушуев wrote:
Hello, 


Need your help for my problem. I'm trying to write json <-> db service with custom 'nio' connection to my database (no jdbc driver).  In short the code so far looks lie:

public static void main(String[] args) throws IOException {

    TCPNIOTransport tcpTransport = TCPNIOTransportBuilder.newInstance().build();
    tcpTransport.start();


    final FilterChain clientFilterChain = FilterChainBuilder.stateless()
            .add(new TransportFilter())
            .build();

    TCPNIOConnectorHandler connectorHandler = TCPNIOConnectorHandler.builder(tcpTransport)
            .processor(clientFilterChain)
            .build();


    SingleEndpointPool singleEndpointPool = SingleEndpointPool //database connections
            .builder(SocketAddress.class)
            .connectorHandler(connectorHandler)
            .endpointAddress(new InetSocketAddress(InetAddress.getByName("localhost"), 12345))
            .maxPoolSize(10)
            .build();

    HttpServer server = HttpServer.createSimpleServer(null, "localhost", 9090);
    server.getServerConfiguration().addHttpHandler(new MyHandler(singleEndpointPool), "/users");

   server.start();

   System.in.read();
}
then MyHandler (from org.glassfish.grizzly.samples.httpserver.nonblockinghandler.UploadHttpHandlerSample):
...
public void service(Request request, org.glassfish.grizzly.http.server.Response response) throws Exception {
   final NIOInputStream in = request.getNIOInputStream();
   response.suspend();
   in.notifyAvailable(new ReadHandler() {
................
        @Override
        public void onAllDataRead() throws Exception {
             // getting connection to db            
               GrizzlyFuture<Connection> connectionFuture = singleEndpointPool.take();
               Connection connection = connectionFuture.get(10, TimeUnit.MILLISECONDS);
            //didn't find any better place to set timeouts, looks like no way to do it through pool?
               connection.setReadTimeout(1000, TimeUnit.MILLISECONDS);
               connection.setWriteTimeout(1000, TimeUnit.MILLISECONDS);
              //here code to make dbRq from http request
              .......................
               connection.write(new ByteBufferWrapper(dbRq), new EmptyCompletionHandler<RecordWriteResult>() {
                  .....
                  @Override
                  public void completed(RecordWriteResult result) {
                        connection.read(new CompletionHandler<ReadResult>() {
                               HERE IS THE PROBLEM
                               Neither completed  no failed are colled
                        });     
                  }
               });
        }
   });
}
The first question - is this code looks like optimal way to write json/db service?
And then, if it is, why doesn't it receive anything from db? It writes, I can see records inserted, and I'm sure db wants to return something, at least "pure nio" implementation receives db responses. 

Best regards, Eugene.

        



Reply | Threaded
Open this post in threaded view
|

Re: Read doesn't work

Евгений Бушуев
Here comes episode 2.

The code:
 
public static void main(String[] args) throws IOException {
        TCPNIOTransport tcpTransport = TCPNIOTransportBuilder.newInstance().build();
        tcpTransport.start();


        final FilterChain clientFilterChain = FilterChainBuilder.stateless()
                .add(new TransportFilter())
                //.add(new DbProtocolFilter())
                .build();

        TCPNIOConnectorHandler connectorHandler = TCPNIOConnectorHandler.builder(tcpTransport)
                .processor(clientFilterChain)
                .build();


        SingleEndpointPool singleEndpointPool = SingleEndpointPool  //database connections
                .builder(SocketAddress.class)
                .connectorHandler(connectorHandler)
                .endpointAddress(new InetSocketAddress(InetAddress.getByName("localhost"), 12345))
                .maxPoolSize(10)
                .build();

        HttpServer server = HttpServer.createSimpleServer(null, "localhost", 9090);
        server.getServerConfiguration().addHttpHandler(new JsonHttpHandler(singleEndpointPool), "/users");
}

class JsonHttpHandler extends HttpHandler {
  public void service(Request request, Response response) throws Exception {

    final NIOInputStream in = request.getNIOInputStream();

    in.notifyAvailable(new ReadHandler() {

      singleEndpointPool.take(

        new EmptyCompletionHandler<Connection>() {

          public void completed(final Connection dbConnection) {

            final AsyncQueueWriter writer = (AsyncQueueWriter) dbConnection.getTransport().getWriter(false);
            writer.write(dbConnection, new ByteBufferWrapper(dbRq), new EmptyCompletionHandler<RecordWriteResult>() {

              public void completed(RecordWriteResult result) {

                final AsyncQueueReader reader = (AsyncQueueReader) dbConnection.getTransport().getReader(false);
                reader.read(dbConnection, buffer, new CompletionHandler<ReadResult>() {
                  public void completed(ReadResult result) {
                         
                      response.setStatus(HttpStatus.OK_200);
                      response.finish();
                  }
                });
              }
            });
          }
        }
      );
    });
  } 
}


This works (reader's CompletionHandler.completed is invoked) only when immidiate call to socketChannel.read in org.glassfish.grizzly.nio.transport.TCPNIOUtils#readSimpleBuffer returns all the respons bytes, the trace:
at org.glassfish.grizzly.nio.transport.TCPNIOUtils.readSimpleBuffer(TCPNIOUtils.java:326)
at org.glassfish.grizzly.nio.transport.TCPNIOUtils.readBuffer(TCPNIOUtils.java:285)
at org.glassfish.grizzly.nio.transport.TCPNIOTransport.read(TCPNIOTransport.java:641)
at org.glassfish.grizzly.nio.transport.TCPNIOAsyncQueueReader.read0(TCPNIOAsyncQueueReader.java:68)
at org.glassfish.grizzly.nio.AbstractNIOAsyncQueueReader.doRead(AbstractNIOAsyncQueueReader.java:314)
at org.glassfish.grizzly.nio.AbstractNIOAsyncQueueReader.read(AbstractNIOAsyncQueueReader.java:118)
at org.glassfish.grizzly.AbstractReader.read(AbstractReader.java:82)
at com.niomongo.JsonHttpHandler$1$2.completed(JsonHttpHandler.java:176)
at com.niomongo.JsonHttpHandler$1$2.completed(JsonHttpHandler.java:132)

but if db/net/whatever was a bit slow and first read returned 0 bytes, then reader's CompletionHandler.updated is called instead of completed, and this is of course fine. What is not so fine is that reader's CompletionHandler never receives subsequent events from selector as the only processor my dbConnection has is FilterChain and I don't have any filters there. 
So, the qustions:
1) Is it possible to get events delivered to reader's CompletionHandler (I remember you advised to add a filter, but still)?
And if anwer to question  1 is not, then
2) How should HttpHandler be notified that DbProtocolFilter filter finished reading and response is ready to be produced. It also needs data from filter's FilterChainContext.message.


Thank you for your help!

Best reagrds, Eugene.

Reply | Threaded
Open this post in threaded view
|

Re: Read doesn't work

oleksiys
Administrator
Hi Eugene,

So, the qustions:
1) Is it possible to get events delivered to reader's CompletionHandler (I remember you advised to add a filter, but still)?
And if anwer to question  1 is not, then
No, unfortunately it won't work like that. As I said connection.read() will work for only very limited number of usecases.

2) How should HttpHandler be notified that DbProtocolFilter filter finished reading and response is ready to be produced. It also needs data from filter's FilterChainContext.message.
Here is the sample, which should give you an idea.
public class Test {
    private static final Attribute<Request> ASSOCIATED_HTTP_REQ_ATTR =
            Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute("DbProtocolFilter");
   
    public static void main(String[] args) throws IOException {

    }

    private final TCPNIOTransport tcpTransport;
    private final SingleEndpointPool singleEndpointPool;

    public Test() {
        final FilterChain clientFilterChain = FilterChainBuilder.stateless()
                .add(new TransportFilter())
                .add(new DbProtocolFilter())
                .build();

        tcpTransport = TCPNIOTransportBuilder.newInstance().build();
       
        TCPNIOConnectorHandler connectorHandler = TCPNIOConnectorHandler.builder(tcpTransport)
                .processor(clientFilterChain)
                .build();
       
        singleEndpointPool = SingleEndpointPool //database connections
                .builder(SocketAddress.class)
                .connectorHandler(connectorHandler)
                .endpointAddress(new InetSocketAddress("localhost", 12345))
                .maxPoolSize(10)
                .build();
       
    }
   
    public void start() throws IOException {
        tcpTransport.start();

        HttpServer server = HttpServer.createSimpleServer(null, "localhost", 9090);
        server.getServerConfiguration().addHttpHandler(
                new JsonHttpHandler(singleEndpointPool),
                "/users");
    }
   
    class JsonHttpHandler extends HttpHandler {

        public void service(final Request request, final Response response)
                throws Exception {

            response.suspend();
           
            final NIOInputStream in = request.getNIOInputStream();

            in.notifyAvailable(new ReadHandler() {

                public void onDataAvailable() throws Exception {
                    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
                }

                public void onError(Throwable t) {
                    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
                }

                public void onAllDataRead() throws Exception {
                    singleEndpointPool.take(
                            new EmptyCompletionHandler<Connection>() {

                                @Override
                                public void completed(final Connection dbConnection) {
                                    ASSOCIATED_HTTP_REQ_ATTR.set(dbConnection, request); // associated the dbConnection with the HTTP Request
                                    dbConnection.write(dbRq);
                                }
                            });
                }
            });
        }
    }
   
    class DbProtocolFilter extends BaseFilter {
        @Override
        public NextAction handleRead(FilterChainContext ctx) throws IOException {
            final Connection c = ctx.getConnection();
            final Buffer dbResponseBuffer = ctx.getMessage(); // the message, could be just a part of response
           
            if (!isComplete(dbResponseBuffer)) {
                return ctx.getStopAction(dbResponseBuffer); // wait for more data to come
            }
           
            final Request associatedHttpRequest = ASSOCIATED_HTTP_REQ_ATTR.remove(c);
            assert associatedHttpRequest != null;
            final Response response = associatedHttpRequest.getResponse();
           
            try {
           
                processResponse(associatedHttpRequest, response, dbResponseBuffer);
            } finally {
                singleEndpointPool.release(c);
                response.resume();
            }
           
            return ctx.getStopAction();
        }
    }
}


Pls. let me know if you have further questions.

WBR,
Alexey.
Reply | Threaded
Open this post in threaded view
|

Re: Read doesn't work

Евгений Бушуев
Hi Alexey, 

Thank you very much for your help! Now everything works perfectly!

Best regards, Eugene.

2014-11-25 0:23 GMT+02:00 Oleksiy Stashok <[hidden email]>:
Hi Eugene,

So, the qustions:
1) Is it possible to get events delivered to reader's CompletionHandler (I remember you advised to add a filter, but still)?
And if anwer to question  1 is not, then
No, unfortunately it won't work like that. As I said connection.read() will work for only very limited number of usecases.

2) How should HttpHandler be notified that DbProtocolFilter filter finished reading and response is ready to be produced. It also needs data from filter's FilterChainContext.message.
Here is the sample, which should give you an idea.
public class Test {
    private static final Attribute<Request> ASSOCIATED_HTTP_REQ_ATTR =
            Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute("DbProtocolFilter");
   
    public static void main(String[] args) throws IOException {

    }

    private final TCPNIOTransport tcpTransport;
    private final SingleEndpointPool singleEndpointPool;

    public Test() {
        final FilterChain clientFilterChain = FilterChainBuilder.stateless()
                .add(new TransportFilter())
                .add(new DbProtocolFilter())
                .build();

        tcpTransport = TCPNIOTransportBuilder.newInstance().build();
       
        TCPNIOConnectorHandler connectorHandler = TCPNIOConnectorHandler.builder(tcpTransport)
                .processor(clientFilterChain)
                .build();
       
        singleEndpointPool = SingleEndpointPool //database connections
                .builder(SocketAddress.class)
                .connectorHandler(connectorHandler)
                .endpointAddress(new InetSocketAddress("localhost", 12345))
                .maxPoolSize(10)
                .build();
       
    }
   
    public void start() throws IOException {
        tcpTransport.start();

        HttpServer server = HttpServer.createSimpleServer(null, "localhost", 9090);
        server.getServerConfiguration().addHttpHandler(
                new JsonHttpHandler(singleEndpointPool),
                "/users");
    }
   
    class JsonHttpHandler extends HttpHandler {

        public void service(final Request request, final Response response)
                throws Exception {

            response.suspend();
           
            final NIOInputStream in = request.getNIOInputStream();

            in.notifyAvailable(new ReadHandler() {

                public void onDataAvailable() throws Exception {
                    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
                }

                public void onError(Throwable t) {
                    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
                }

                public void onAllDataRead() throws Exception {
                    singleEndpointPool.take(
                            new EmptyCompletionHandler<Connection>() {

                                @Override
                                public void completed(final Connection dbConnection) {
                                    ASSOCIATED_HTTP_REQ_ATTR.set(dbConnection, request); // associated the dbConnection with the HTTP Request
                                    dbConnection.write(dbRq);
                                }
                            });
                }
            });
        }
    }
   
    class DbProtocolFilter extends BaseFilter {
        @Override
        public NextAction handleRead(FilterChainContext ctx) throws IOException {
            final Connection c = ctx.getConnection();
            final Buffer dbResponseBuffer = ctx.getMessage(); // the message, could be just a part of response
           
            if (!isComplete(dbResponseBuffer)) {
                return ctx.getStopAction(dbResponseBuffer); // wait for more data to come
            }
           
            final Request associatedHttpRequest = ASSOCIATED_HTTP_REQ_ATTR.remove(c);
            assert associatedHttpRequest != null;
            final Response response = associatedHttpRequest.getResponse();
           
            try {
           
                processResponse(associatedHttpRequest, response, dbResponseBuffer);
            } finally {
                singleEndpointPool.release(c);
                response.resume();
            }
           
            return ctx.getStopAction();
        }
    }
}


Pls. let me know if you have further questions.

WBR,
Alexey.

Reply | Threaded
Open this post in threaded view
|

Re: Read doesn't work

Евгений Бушуев
Hi Alexey, 

Could you please advise what is the best way to check for the read timeout in DbFilter (please see reply history for the contex)? That is when  dbConnection.write(dbRq) was exectued in JsonHttpHandler but  DbProtocolFilter.handleRead doesn't get complete dbResponseBuffer within given time. I'm thinking about replacing ASSOCIATED_HTTP_REQ_ATTR with DelayQueue similar to how it's done for keepAliveCleaner in SingleEndpointPool. Would it be a good idea?

Best regards, Eugene.



2014-11-25 21:20 GMT+01:00 Евгений Бушуев <[hidden email]>:
Hi Alexey, 

Thank you very much for your help! Now everything works perfectly!

Best regards, Eugene.

2014-11-25 0:23 GMT+02:00 Oleksiy Stashok <[hidden email]>:
Hi Eugene,

So, the qustions:
1) Is it possible to get events delivered to reader's CompletionHandler (I remember you advised to add a filter, but still)?
And if anwer to question  1 is not, then
No, unfortunately it won't work like that. As I said connection.read() will work for only very limited number of usecases.

2) How should HttpHandler be notified that DbProtocolFilter filter finished reading and response is ready to be produced. It also needs data from filter's FilterChainContext.message.
Here is the sample, which should give you an idea.
public class Test {
    private static final Attribute<Request> ASSOCIATED_HTTP_REQ_ATTR =
            Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute("DbProtocolFilter");
   
    public static void main(String[] args) throws IOException {

    }

    private final TCPNIOTransport tcpTransport;
    private final SingleEndpointPool singleEndpointPool;

    public Test() {
        final FilterChain clientFilterChain = FilterChainBuilder.stateless()
                .add(new TransportFilter())
                .add(new DbProtocolFilter())
                .build();

        tcpTransport = TCPNIOTransportBuilder.newInstance().build();
       
        TCPNIOConnectorHandler connectorHandler = TCPNIOConnectorHandler.builder(tcpTransport)
                .processor(clientFilterChain)
                .build();
       
        singleEndpointPool = SingleEndpointPool //database connections
                .builder(SocketAddress.class)
                .connectorHandler(connectorHandler)
                .endpointAddress(new InetSocketAddress("localhost", 12345))
                .maxPoolSize(10)
                .build();
       
    }
   
    public void start() throws IOException {
        tcpTransport.start();

        HttpServer server = HttpServer.createSimpleServer(null, "localhost", 9090);
        server.getServerConfiguration().addHttpHandler(
                new JsonHttpHandler(singleEndpointPool),
                "/users");
    }
   
    class JsonHttpHandler extends HttpHandler {

        public void service(final Request request, final Response response)
                throws Exception {

            response.suspend();
           
            final NIOInputStream in = request.getNIOInputStream();

            in.notifyAvailable(new ReadHandler() {

                public void onDataAvailable() throws Exception {
                    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
                }

                public void onError(Throwable t) {
                    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
                }

                public void onAllDataRead() throws Exception {
                    singleEndpointPool.take(
                            new EmptyCompletionHandler<Connection>() {

                                @Override
                                public void completed(final Connection dbConnection) {
                                    ASSOCIATED_HTTP_REQ_ATTR.set(dbConnection, request); // associated the dbConnection with the HTTP Request
                                    dbConnection.write(dbRq);
                                }
                            });
                }
            });
        }
    }
   
    class DbProtocolFilter extends BaseFilter {
        @Override
        public NextAction handleRead(FilterChainContext ctx) throws IOException {
            final Connection c = ctx.getConnection();
            final Buffer dbResponseBuffer = ctx.getMessage(); // the message, could be just a part of response
           
            if (!isComplete(dbResponseBuffer)) {
                return ctx.getStopAction(dbResponseBuffer); // wait for more data to come
            }
           
            final Request associatedHttpRequest = ASSOCIATED_HTTP_REQ_ATTR.remove(c);
            assert associatedHttpRequest != null;
            final Response response = associatedHttpRequest.getResponse();
           
            try {
           
                processResponse(associatedHttpRequest, response, dbResponseBuffer);
            } finally {
                singleEndpointPool.release(c);
                response.resume();
            }
           
            return ctx.getStopAction();
        }
    }
}


Pls. let me know if you have further questions.

WBR,
Alexey.


Reply | Threaded
Open this post in threaded view
|

Re: Read doesn't work

oleksiys
Administrator
Hey,

you can use either DelayQueue or standard ScheduledExecutorService to track the DB request timeout. Once timeout is hit - just close the connection.
In the JsonHandler you'll need to register a CloseListener to be notified when connection is closed and resume HTTP processing

Like:

                public void onAllDataRead() throws Exception {
                    singleEndpointPool.take(
                            new EmptyCompletionHandler<Connection>() {

                                @Override
                                public void completed(final Connection dbConnection) {
                                    ASSOCIATED_HTTP_REQ_ATTR.set(dbConnection, request); // associated the dbConnection with the HTTP Request
                                    startTrackDbConnectionTimeout(dbConnection);
                                    dbConnection.addCloseListener(...); // in the CloseListener we need to resume the HTTP request/response processing
                                    dbConnection.write(dbRq);
                                }
                            });
                }

Please don't forget to clean up the CloseListener and other possible handlers and listeners before returning Connection back to the pool.

WBR,
Alexey.

On 6/7/16 11:50 PM, Евгений Бушуев wrote:
Hi Alexey, 

Could you please advise what is the best way to check for the read timeout in DbFilter (please see reply history for the contex)? That is when  dbConnection.write(dbRq) was exectued in JsonHttpHandler but  DbProtocolFilter.handleRead doesn't get complete dbResponseBuffer within given time. I'm thinking about replacing ASSOCIATED_HTTP_REQ_ATTR with DelayQueue similar to how it's done for keepAliveCleaner in SingleEndpointPool. Would it be a good idea?

Best regards, Eugene.



2014-11-25 21:20 GMT+01:00 Евгений Бушуев <[hidden email]>:
Hi Alexey, 

Thank you very much for your help! Now everything works perfectly!

Best regards, Eugene.

2014-11-25 0:23 GMT+02:00 Oleksiy Stashok <[hidden email]>:
Hi Eugene,

So, the qustions:
1) Is it possible to get events delivered to reader's CompletionHandler (I remember you advised to add a filter, but still)?
And if anwer to question  1 is not, then
No, unfortunately it won't work like that. As I said connection.read() will work for only very limited number of usecases.

2) How should HttpHandler be notified that DbProtocolFilter filter finished reading and response is ready to be produced. It also needs data from filter's FilterChainContext.message.
Here is the sample, which should give you an idea.
public class Test {
    private static final Attribute<Request> ASSOCIATED_HTTP_REQ_ATTR =
            Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute("DbProtocolFilter");
   
    public static void main(String[] args) throws IOException {

    }

    private final TCPNIOTransport tcpTransport;
    private final SingleEndpointPool singleEndpointPool;

    public Test() {
        final FilterChain clientFilterChain = FilterChainBuilder.stateless()
                .add(new TransportFilter())
                .add(new DbProtocolFilter())
                .build();

        tcpTransport = TCPNIOTransportBuilder.newInstance().build();
       
        TCPNIOConnectorHandler connectorHandler = TCPNIOConnectorHandler.builder(tcpTransport)
                .processor(clientFilterChain)
                .build();
       
        singleEndpointPool = SingleEndpointPool //database connections
                .builder(SocketAddress.class)
                .connectorHandler(connectorHandler)
                .endpointAddress(new InetSocketAddress("localhost", 12345))
                .maxPoolSize(10)
                .build();
       
    }
   
    public void start() throws IOException {
        tcpTransport.start();

        HttpServer server = HttpServer.createSimpleServer(null, "localhost", 9090);
        server.getServerConfiguration().addHttpHandler(
                new JsonHttpHandler(singleEndpointPool),
                "/users");
    }
   
    class JsonHttpHandler extends HttpHandler {

        public void service(final Request request, final Response response)
                throws Exception {

            response.suspend();
           
            final NIOInputStream in = request.getNIOInputStream();

            in.notifyAvailable(new ReadHandler() {

                public void onDataAvailable() throws Exception {
                    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
                }

                public void onError(Throwable t) {
                    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
                }

                public void onAllDataRead() throws Exception {
                    singleEndpointPool.take(
                            new EmptyCompletionHandler<Connection>() {

                                @Override
                                public void completed(final Connection dbConnection) {
                                    ASSOCIATED_HTTP_REQ_ATTR.set(dbConnection, request); // associated the dbConnection with the HTTP Request
                                    dbConnection.write(dbRq);
                                }
                            });
                }
            });
        }
    }
   
    class DbProtocolFilter extends BaseFilter {
        @Override
        public NextAction handleRead(FilterChainContext ctx) throws IOException {
            final Connection c = ctx.getConnection();
            final Buffer dbResponseBuffer = ctx.getMessage(); // the message, could be just a part of response
           
            if (!isComplete(dbResponseBuffer)) {
                return ctx.getStopAction(dbResponseBuffer); // wait for more data to come
            }
           
            final Request associatedHttpRequest = ASSOCIATED_HTTP_REQ_ATTR.remove(c);
            assert associatedHttpRequest != null;
            final Response response = associatedHttpRequest.getResponse();
           
            try {
           
                processResponse(associatedHttpRequest, response, dbResponseBuffer);
            } finally {
                singleEndpointPool.release(c);
                response.resume();
            }
           
            return ctx.getStopAction();
        }
    }
}


Pls. let me know if you have further questions.

WBR,
Alexey.



Reply | Threaded
Open this post in threaded view
|

Re: Read doesn't work

Евгений Бушуев
ok, it's clear now. Thank you very much!

Best regards, Eugene.

2016-06-08 9:56 GMT+02:00 Oleksiy Stashok <[hidden email]>:
Hey,

you can use either DelayQueue or standard ScheduledExecutorService to track the DB request timeout. Once timeout is hit - just close the connection.
In the JsonHandler you'll need to register a CloseListener to be notified when connection is closed and resume HTTP processing

Like:

                public void onAllDataRead() throws Exception {
                    singleEndpointPool.take(
                            new EmptyCompletionHandler<Connection>() {

                                @Override
                                public void completed(final Connection dbConnection) {
                                    ASSOCIATED_HTTP_REQ_ATTR.set(dbConnection, request); // associated the dbConnection with the HTTP Request
                                    startTrackDbConnectionTimeout(dbConnection);
                                    dbConnection.addCloseListener(...); // in the CloseListener we need to resume the HTTP request/response processing
                                    dbConnection.write(dbRq);
                                }
                            });
                }

Please don't forget to clean up the CloseListener and other possible handlers and listeners before returning Connection back to the pool.

WBR,
Alexey.


On 6/7/16 11:50 PM, Евгений Бушуев wrote:
Hi Alexey, 

Could you please advise what is the best way to check for the read timeout in DbFilter (please see reply history for the contex)? That is when  dbConnection.write(dbRq) was exectued in JsonHttpHandler but  DbProtocolFilter.handleRead doesn't get complete dbResponseBuffer within given time. I'm thinking about replacing ASSOCIATED_HTTP_REQ_ATTR with DelayQueue similar to how it's done for keepAliveCleaner in SingleEndpointPool. Would it be a good idea?

Best regards, Eugene.



2014-11-25 21:20 GMT+01:00 Евгений Бушуев <[hidden email]>:
Hi Alexey, 

Thank you very much for your help! Now everything works perfectly!

Best regards, Eugene.

2014-11-25 0:23 GMT+02:00 Oleksiy Stashok <[hidden email][hidden email]>:
Hi Eugene,

So, the qustions:
1) Is it possible to get events delivered to reader's CompletionHandler (I remember you advised to add a filter, but still)?
And if anwer to question  1 is not, then
No, unfortunately it won't work like that. As I said connection.read() will work for only very limited number of usecases.

2) How should HttpHandler be notified that DbProtocolFilter filter finished reading and response is ready to be produced. It also needs data from filter's FilterChainContext.message.
Here is the sample, which should give you an idea.
public class Test {
    private static final Attribute<Request> ASSOCIATED_HTTP_REQ_ATTR =
            Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute("DbProtocolFilter");
   
    public static void main(String[] args) throws IOException {

    }

    private final TCPNIOTransport tcpTransport;
    private final SingleEndpointPool singleEndpointPool;

    public Test() {
        final FilterChain clientFilterChain = FilterChainBuilder.stateless()
                .add(new TransportFilter())
                .add(new DbProtocolFilter())
                .build();

        tcpTransport = TCPNIOTransportBuilder.newInstance().build();
       
        TCPNIOConnectorHandler connectorHandler = TCPNIOConnectorHandler.builder(tcpTransport)
                .processor(clientFilterChain)
                .build();
       
        singleEndpointPool = SingleEndpointPool //database connections
                .builder(SocketAddress.class)
                .connectorHandler(connectorHandler)
                .endpointAddress(new InetSocketAddress("localhost", 12345))
                .maxPoolSize(10)
                .build();
       
    }
   
    public void start() throws IOException {
        tcpTransport.start();

        HttpServer server = HttpServer.createSimpleServer(null, "localhost", 9090);
        server.getServerConfiguration().addHttpHandler(
                new JsonHttpHandler(singleEndpointPool),
                "/users");
    }
   
    class JsonHttpHandler extends HttpHandler {

        public void service(final Request request, final Response response)
                throws Exception {

            response.suspend();
           
            final NIOInputStream in = request.getNIOInputStream();

            in.notifyAvailable(new ReadHandler() {

                public void onDataAvailable() throws Exception {
                    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
                }

                public void onError(Throwable t) {
                    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
                }

                public void onAllDataRead() throws Exception {
                    singleEndpointPool.take(
                            new EmptyCompletionHandler<Connection>() {

                                @Override
                                public void completed(final Connection dbConnection) {
                                    ASSOCIATED_HTTP_REQ_ATTR.set(dbConnection, request); // associated the dbConnection with the HTTP Request
                                    dbConnection.write(dbRq);
                                }
                            });
                }
            });
        }
    }
   
    class DbProtocolFilter extends BaseFilter {
        @Override
        public NextAction handleRead(FilterChainContext ctx) throws IOException {
            final Connection c = ctx.getConnection();
            final Buffer dbResponseBuffer = ctx.getMessage(); // the message, could be just a part of response
           
            if (!isComplete(dbResponseBuffer)) {
                return ctx.getStopAction(dbResponseBuffer); // wait for more data to come
            }
           
            final Request associatedHttpRequest = ASSOCIATED_HTTP_REQ_ATTR.remove(c);
            assert associatedHttpRequest != null;
            final Response response = associatedHttpRequest.getResponse();
           
            try {
           
                processResponse(associatedHttpRequest, response, dbResponseBuffer);
            } finally {
                singleEndpointPool.release(c);
                response.resume();
            }
           
            return ctx.getStopAction();
        }
    }
}


Pls. let me know if you have further questions.

WBR,
Alexey.