Implementing custom StreamAlgorithm

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Implementing custom StreamAlgorithm

Eric Heaton
Hi all,

I wish to implement a custom StreamAlgorithm whose parse() method can
potentially return false if the incoming request has not been received
completely into the input buffer. (For the curious, I'm trying to write a
custom protocol that handles the Typed Parameter Language
(tpl.sourceforge.net)).

However, when my StreamAlgorithm's parse() method returns false, the request
is not processed. I tried inspecting the ContentLengthAlgorithm's parse()
method, but found that this does not seem to work either. I create a simple
SelectorThread for the ContentLengthAlgorithm and POST a request that's
greater than the default buffer size of 8192 bytes. The request is never
processed.

Here's a (hopefully) concise example:

SelectorThread selectorThread = new SelectorThread() {
  protected void initAlgorithm()
  {
    setAlgorithmClassName(
      "com.sun.grizzly.http.algorithms.ContentLengthAlgorithm");
    super.initAlgorithm();
  }
};
selectorThread.setPort(80);
selectorThread.setAdapter( new StaticResourcesAdapter());
selectorThread.initEndpoint();
selectorThread.startEndpoint();

The client is:

URL url = new URL("http://localhost/index.html");
URLConnection conn = url.openConnection();
conn.setDoOutput(true);
PrintWriter writer = new PrintWriter(conn.getOutputStream());
writer.print("param=");
for( int i = 0; i < 8000; i++ )
{
   writer.print("data");
}
writer.close();

InputStream in = conn.getInputStream();
while( in.read() > 0 ) { System.out.print("."); }
System.out.println();

Am I missing some vital piece of configuration here, or am I
misunderstanding how the parse() method is supposed to work? By the way, I'm
using Grizzly 1.5.1 and am relatively new to using it.

Thanks,
-Eric

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Implementing custom StreamAlgorithm

D. J. Hagberg (Sun)
Eric Heaton wrote:
 > I wish to implement a custom StreamAlgorithm whose parse() method can
 > potentially return false if the incoming request has not been received
 > completely into the input buffer. (For the curious, I'm trying to
 > write a custom protocol that handles the Typed Parameter Language
 > (tpl.sourceforge.net)).
 >
 > However, when my StreamAlgorithm's parse() method returns false, the
 > request is not processed. I tried inspecting the
 > ContentLengthAlgorithm's parse() method, but found that this does not
 > seem to work either. I create a simple SelectorThread for the
 > ContentLengthAlgorithm and POST a request that's greater than the
 > default buffer size of 8192 bytes. The request is never processed.

Eric --

I hope to post my design work up on my blog soon
(http://blogs.sun.com/djhagberg) but I decided to cut out the
StreamAlgorithm pattern in my application (it seemed like it was more
tightly coupled to http-type protocols) and went with the lower-level
ProtocolFilter API.  I have the SSLReadFilter in front of this to decode
SSL into cleartext bytes...

My Message's have a fixed-length MessageHeader followed by a
variable-length byte[] content.

There are a few methods here that deal with partial message reads:

- prepareParseBuffer prepends any remaining bytes from the last read
   onto the current ByteBuffer.

- checkRemainderHeader checks for a previously-read MessageHeader

- clearRemainder does what it says

- preserveRemainder preserves any leftover MessageHeader or partially-
   read bytes.

I can't say this code has passed all regression/unit tests yet but it
might help.

Note it also depends on an SSLSession to act as a Map for partial reads.
  If you don't have an SSLSession, you may have something else attached
to or associated with your SelectionKey that could be used to stow
remaining stuff...

public class MessageProtocolFilter implements ProtocolFilter {
     . . . statics, logger, etc. . . .
     public static final String REMBUF = "REMBUF";
     public static final Stirng REMHEAD = "REMHEAD";

     /**
      * Decode raw cleartext bytes into one or more Message's
      * if possible, and hand off to dispatcher to do real work.
      */
     public boolean execute(Context ctx) {
         WorkerThreadImpl workerThread = (WorkerThreadImpl)Thread.
                 currentThread();
         SSLSession session = workerThread.getSSLEngine().getSession();
         ByteBuffer buf = prepareParseBuffer(workerThread, session);

         // Keep parsing while there is a possibility of decoding msgs
         int remain;
         boolean hadError = false;
         Message m = null;
         MessageHeader mh = checkRemainderHeader(session);
         while( (remain=buf.limit()-buf.position()) > 0 ) {

             // Have enough bytes to read a new header?
             if( mh == null && remain < MessageHeader.SIZE ) {
                 break;

             // Read a new header if we need it
             } else if( tmh == null ) {
                 mh = MessageHeader.readHeader(buf);
                 // check if un-parseable header
                 if( !isValidHeader(mh) ) {
                     log.severe("Invalid msg header: "+mh);
                     hadError = true;
                     break;
                 }
             }

             // check if remainder is too small for message contents
             remain = buf.limit()-buf.position();
             if( remain < mh.getContentLength() ) {
                 break;
             }

             // build message with header content bytes, dispatch msg
             byte[] body =  new byte[mh.getContentLength()];
             buf.get(body);
             m = new Message(mh, body);
             if( !dispatcher.handleMessage(ctx.getSelectionKey(), m) ) {
                 hadError = true;
                 break;
             }

             m = null;
             mh = null;
         }

         // If we had an error, need to shut down socket connection
         if( hadError ) {
             ctx.setKeyRegistrationState(Context.
                     KeyRegistrationState.CANCEL);
             clearRemainder(session);
         } else {
             ctx.setKeyRegistrationState(Context.
                     KeyRegistrationState.REGISTER);
             preserveRemainder(session, buf, mh);
         }

         // Ensure the "working" buffer is cleared before returning
         workerThread.getByteBuffer().clear();

         return !hadError;
     }

     public boolean postExecute(Context ctx){
         return true;
     }

     /**
      * Check if there are any remaining bytes from a previous read
      * that just had a partial message header or message content.
      */
     private ByteBuffer prepareParseBuffer(WorkerThreadImpl workerThread,
             SSLSession session) {
         ByteBuffer bb = workerThread.getByteBuffer();
         ByteBuffer rem = (ByteBuffer)session.getValue(REMBUF);
         if( rem != null && rem.position() > 0 ) {
             // concatenate remainder with new bytes to be parsed
             bb.flip();
             int cnt = bb.limit() - bb.position();
             int space = rem.limit() - rem.position();
             // Make remainder buffer bigger if necessary
             if( space < cnt ) {
                 ByteBuffer rem2 = ByteBuffer.allocate(bb.capacity() +
                         cnt + 512);
                 rem.flip();
                 rem2.put(rem);
                 rem.clear();
                 rem = rem2;
                 session.putValue(REMBUF, rem);
             }
             rem.put(bb);
             bb.clear();
             bb = rem;
         }
         bb.flip();  // prepare for read
         return bb;
     }

     /**
      * Check if last time we were able to decode a header but not
      * able to read the full message contents.
      */
     private MessageHeader checkRemainderHeader(SSLSession session) {
         MessageHeader mh;
         mh = (MessageHeader)session.getValue(REMHEAD);
         if( mh != null ) {
             session.removeValue(REMHEAD);
         }
         return mh;
     }

     /**
      * Remove any "remainder" objects from the SSL session,
      * in preparation for shutting down the connection.
      */
     private void clearRemainder(SSLSession session) {
         session.removeValue(REMHEAD);
         ByteBuffer rem = (ByteBuffer)session.getValue(REMBUF);
         if( rem != null ) {
             rem.clear();
             session.removeValue(REMBUF);
         }
     }

     /**
      * Preserve any partially-read bytes or header and partial content.
      */
     private void preserveRemainder(SSLSession session, ByteBuffer bb,
             MessageHeader mh) {
         if( mh == null ) {
             session.removeValue(REMHEAD);
         } else {
             session.putValue(REMHEAD, mh);
         }
         int remainCnt = bb.limit()-bb.position();
         ByteBuffer rem = (ByteBuffer)session.getValue(REMBUF);
         if( rem != null && rem == bb ) {
             bb.compact(); // automatically prepares for next append
         } else if( rem != null && remainCnt == 0 ) {
             rem.clear();
         } else if( rem != null ) {
             if( rem.limit() != rem.capacity() ) {
                 rem.flip(); // prepare for appending
             }
             if( remainCnt > (rem.limit()-rem.position()) ) {
                 ByteBuffer rem2 = ByteBuffer.allocate(rem.capacity() +
                         remainCnt + 1024);
                 rem.flip(); // switch to read mode
                 rem2.put(rem);
                 rem.clear();
                 rem = rem2;
             }
             rem.put(bb);
         } else if( remainCnt > 0 ) {
             int alloc = 16 * 1024;
             if( alloc < remainCnt ) {
                 alloc = remainCnt + 2*1024;
             }
             rem = ByteBuffer.allocate(alloc);
             rem.put(bb);
         }
         if( rem != null ) {
             session.putValue(REMBUF, rem);
         }
     }

     . . . More housekeeping methods, logging, etc ...
}

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Implementing custom StreamAlgorithm

Jeanfrancois Arcand-2
In reply to this post by Eric Heaton
Hi Eric,

Eric Heaton wrote:

> Hi all,
>
> I wish to implement a custom StreamAlgorithm whose parse() method can
> potentially return false if the incoming request has not been received
> completely into the input buffer. (For the curious, I'm trying to write
> a custom protocol that handles the Typed Parameter Language
> (tpl.sourceforge.net)).
>
> However, when my StreamAlgorithm's parse() method returns false, the
> request is not processed. I tried inspecting the
> ContentLengthAlgorithm's parse() method, but found that this does not
> seem to work either. I create a simple SelectorThread for the
> ContentLengthAlgorithm and POST a request that's greater than the
> default buffer size of 8192 bytes. The request is never processed.

Hum...looks like a bug I've introduced in 1.5, as I've just tried with
1.0 and it worked.

>
> Here's a (hopefully) concise example:
>
> SelectorThread selectorThread = new SelectorThread() {
>  protected void initAlgorithm()
>  {
>    setAlgorithmClassName(
>      "com.sun.grizzly.http.algorithms.ContentLengthAlgorithm");
>    super.initAlgorithm();
>  }
> };
> selectorThread.setPort(80);
> selectorThread.setAdapter( new StaticResourcesAdapter());
> selectorThread.initEndpoint();
> selectorThread.startEndpoint();
>
> The client is:
>
> URL url = new URL("http://localhost/index.html");
> URLConnection conn = url.openConnection();
> conn.setDoOutput(true);
> PrintWriter writer = new PrintWriter(conn.getOutputStream());
> writer.print("param=");
> for( int i = 0; i < 8000; i++ )
> {
>   writer.print("data");
> }
> writer.close();
>
> InputStream in = conn.getInputStream();
> while( in.read() > 0 ) { System.out.print("."); }
> System.out.println();
>
> Am I missing some vital piece of configuration here, or am I
> misunderstanding how the parse() method is supposed to work? By the way,
> I'm using Grizzly 1.5.1 and am relatively new to using it.

No you are absolutely right. The current http sub module has a bug that
prevent the StreamAlgorithm to work as expected. Now why do you need to
use the http submodule for your project? Would it be easier to use
directly the framework classes? This way you don't have the http overhead.

I will fix the StreamAlgorithm so it can work with http like it was in 1.5.

Thanks

-- Jeanfrancois



>
> Thanks,
> -Eric
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> For additional commands, e-mail: [hidden email]
>

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Implementing custom StreamAlgorithm

Jeanfrancois Arcand-2
In reply to this post by D. J. Hagberg (Sun)
Hi D.J.

D. J. Hagberg (Sun) wrote:

> Eric Heaton wrote:
>  > I wish to implement a custom StreamAlgorithm whose parse() method can
>  > potentially return false if the incoming request has not been received
>  > completely into the input buffer. (For the curious, I'm trying to
>  > write a custom protocol that handles the Typed Parameter Language
>  > (tpl.sourceforge.net)).
>  >
>  > However, when my StreamAlgorithm's parse() method returns false, the
>  > request is not processed. I tried inspecting the
>  > ContentLengthAlgorithm's parse() method, but found that this does not
>  > seem to work either. I create a simple SelectorThread for the
>  > ContentLengthAlgorithm and POST a request that's greater than the
>  > default buffer size of 8192 bytes. The request is never processed.
>
> Eric --
>
> I hope to post my design work up on my blog soon
> (http://blogs.sun.com/djhagberg) but I decided to cut out the
> StreamAlgorithm pattern in my application (it seemed like it was more
> tightly coupled to http-type protocols) and went with the lower-level
> ProtocolFilter API.  I have the SSLReadFilter in front of this to decode
> SSL into cleartext bytes...

Good :-) The http module should really be used, if possible, only to
improve/support the http protocol (that was the main reason why we have
dropped active development on Grizzly 1.0).

>
> My Message's have a fixed-length MessageHeader followed by a
> variable-length byte[] content.
>
> There are a few methods here that deal with partial message reads:
>
> - prepareParseBuffer prepends any remaining bytes from the last read
>   onto the current ByteBuffer.
>
> - checkRemainderHeader checks for a previously-read MessageHeader
>
> - clearRemainder does what it says
>
> - preserveRemainder preserves any leftover MessageHeader or partially-
>   read bytes.
>
> I can't say this code has passed all regression/unit tests yet but it
> might help.
>
> Note it also depends on an SSLSession to act as a Map for partial reads.
>  If you don't have an SSLSession, you may have something else attached
> to or associated with your SelectionKey that could be used to stow
> remaining stuff...
>
> public class MessageProtocolFilter implements ProtocolFilter {
>     . . . statics, logger, etc. . . .
>     public static final String REMBUF = "REMBUF";
>     public static final Stirng REMHEAD = "REMHEAD";
>
>     /**
>      * Decode raw cleartext bytes into one or more Message's
>      * if possible, and hand off to dispatcher to do real work.
>      */
>     public boolean execute(Context ctx) {
>         WorkerThreadImpl workerThread = (WorkerThreadImpl)Thread.
>                 currentThread();
>         SSLSession session = workerThread.getSSLEngine().getSession();
>         ByteBuffer buf = prepareParseBuffer(workerThread, session);
>
>         // Keep parsing while there is a possibility of decoding msgs
>         int remain;
>         boolean hadError = false;
>         Message m = null;
>         MessageHeader mh = checkRemainderHeader(session);
>         while( (remain=buf.limit()-buf.position()) > 0 ) {
>
>             // Have enough bytes to read a new header?
>             if( mh == null && remain < MessageHeader.SIZE ) {
>                 break;
>
>             // Read a new header if we need it
>             } else if( tmh == null ) {
>                 mh = MessageHeader.readHeader(buf);
>                 // check if un-parseable header
>                 if( !isValidHeader(mh) ) {
>                     log.severe("Invalid msg header: "+mh);
>                     hadError = true;
>                     break;
>                 }
>             }
>
>             // check if remainder is too small for message contents
>             remain = buf.limit()-buf.position();
>             if( remain < mh.getContentLength() ) {
>                 break;
>             }
>
>             // build message with header content bytes, dispatch msg
>             byte[] body =  new byte[mh.getContentLength()];
>             buf.get(body);
>             m = new Message(mh, body);
>             if( !dispatcher.handleMessage(ctx.getSelectionKey(), m) ) {
>                 hadError = true;
>                 break;
>             }
>
>             m = null;
>             mh = null;
>         }
>
>         // If we had an error, need to shut down socket connection
>         if( hadError ) {
>             ctx.setKeyRegistrationState(Context.
>                     KeyRegistrationState.CANCEL);
>             clearRemainder(session);
>         } else {
>             ctx.setKeyRegistrationState(Context.
>                     KeyRegistrationState.REGISTER);
>             preserveRemainder(session, buf, mh);
>         }
>
>         // Ensure the "working" buffer is cleared before returning
>         workerThread.getByteBuffer().clear();
>
>         return !hadError;
>     }
>
>     public boolean postExecute(Context ctx){
>         return true;
>     }
>
>     /**
>      * Check if there are any remaining bytes from a previous read
>      * that just had a partial message header or message content.
>      */
>     private ByteBuffer prepareParseBuffer(WorkerThreadImpl workerThread,
>             SSLSession session) {
>         ByteBuffer bb = workerThread.getByteBuffer();
>         ByteBuffer rem = (ByteBuffer)session.getValue(REMBUF);
>         if( rem != null && rem.position() > 0 ) {
>             // concatenate remainder with new bytes to be parsed
>             bb.flip();
>             int cnt = bb.limit() - bb.position();
>             int space = rem.limit() - rem.position();
>             // Make remainder buffer bigger if necessary
>             if( space < cnt ) {
>                 ByteBuffer rem2 = ByteBuffer.allocate(bb.capacity() +
>                         cnt + 512);
>                 rem.flip();
>                 rem2.put(rem);
>                 rem.clear();
>                 rem = rem2;
>                 session.putValue(REMBUF, rem);
>             }
>             rem.put(bb);
>             bb.clear();
>             bb = rem;
>         }
>         bb.flip();  // prepare for read
>         return bb;
>     }
>
>     /**
>      * Check if last time we were able to decode a header but not
>      * able to read the full message contents.
>      */
>     private MessageHeader checkRemainderHeader(SSLSession session) {
>         MessageHeader mh;
>         mh = (MessageHeader)session.getValue(REMHEAD);
>         if( mh != null ) {
>             session.removeValue(REMHEAD);
>         }
>         return mh;
>     }
>
>     /**
>      * Remove any "remainder" objects from the SSL session,
>      * in preparation for shutting down the connection.
>      */
>     private void clearRemainder(SSLSession session) {
>         session.removeValue(REMHEAD);
>         ByteBuffer rem = (ByteBuffer)session.getValue(REMBUF);
>         if( rem != null ) {
>             rem.clear();
>             session.removeValue(REMBUF);
>         }
>     }
>
>     /**
>      * Preserve any partially-read bytes or header and partial content.
>      */
>     private void preserveRemainder(SSLSession session, ByteBuffer bb,
>             MessageHeader mh) {
>         if( mh == null ) {
>             session.removeValue(REMHEAD);
>         } else {
>             session.putValue(REMHEAD, mh);
>         }
>         int remainCnt = bb.limit()-bb.position();
>         ByteBuffer rem = (ByteBuffer)session.getValue(REMBUF);
>         if( rem != null && rem == bb ) {
>             bb.compact(); // automatically prepares for next append
>         } else if( rem != null && remainCnt == 0 ) {
>             rem.clear();
>         } else if( rem != null ) {
>             if( rem.limit() != rem.capacity() ) {
>                 rem.flip(); // prepare for appending
>             }
>             if( remainCnt > (rem.limit()-rem.position()) ) {
>                 ByteBuffer rem2 = ByteBuffer.allocate(rem.capacity() +
>                         remainCnt + 1024);
>                 rem.flip(); // switch to read mode
>                 rem2.put(rem);
>                 rem.clear();
>                 rem = rem2;
>             }
>             rem.put(bb);
>         } else if( remainCnt > 0 ) {
>             int alloc = 16 * 1024;
>             if( alloc < remainCnt ) {
>                 alloc = remainCnt + 2*1024;
>             }
>             rem = ByteBuffer.allocate(alloc);
>             rem.put(bb);
>         }
>         if( rem != null ) {
>             session.putValue(REMBUF, rem);
>         }
>     }
>
>     . . . More housekeeping methods, logging, etc ...

This looks really good. Would you agree, once you think it is stable
enough, to give it to the community? I think people will like if made
available such filter with the default distribution. Let me know and I
can try to work on a clear text version of your filter (by removing the
SSLSession and add a Grizzly's Session object that can be used when more
bytes are required).

Thanks

-- Jeanfrancois



> }
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> For additional commands, e-mail: [hidden email]
>

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Implementing custom StreamAlgorithm

Eric Heaton
In reply to this post by Jeanfrancois Arcand-2
Jeanfrancois,

 > Jeanfrancois wrote:
>
> No you are absolutely right. The current http sub module has a bug that
> prevent the StreamAlgorithm to work as expected. Now why do you need to
> use the http submodule for your project? Would it be easier to use
> directly the framework classes? This way you don't have the http overhead.
>

I've only been looking at Grizzly for a couple of days, and had completely
missed the possibility of directly using a Controller to implement my custom
protocol, instead of subclassing SelectorThread. I'll change my approach
(with the excellent ProtocolFilter headstart that D.J. posted) and not worry
about the 'http' module.

Thanks so much for the quick response!
-Eric

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Implementing custom StreamAlgorithm

Eric Heaton
In reply to this post by D. J. Hagberg (Sun)
D.J.,

This is exactly the response I needed! I'll start with the Controller from
the 'grizzly' framework and implement a custom ProtocolFiler as you describe
below. I don't need SSL support, so for partial reads, I think I'll instead
try a custom Pipeline that creates a custom WorkerThreadImpl that contains
the partially read message header/body.

Thanks!
-Eric

----- Original Message -----
From: "D. J. Hagberg (Sun)" <[hidden email]>
To: <[hidden email]>
Sent: Tuesday, June 19, 2007 4:28 PM
Subject: Re: Implementing custom StreamAlgorithm


> Eric Heaton wrote:
> > I wish to implement a custom StreamAlgorithm whose parse() method can
> > potentially return false if the incoming request has not been received
> > completely into the input buffer. (For the curious, I'm trying to
> > write a custom protocol that handles the Typed Parameter Language
> > (tpl.sourceforge.net)).
> >
> > However, when my StreamAlgorithm's parse() method returns false, the
> > request is not processed. I tried inspecting the
> > ContentLengthAlgorithm's parse() method, but found that this does not
> > seem to work either. I create a simple SelectorThread for the
> > ContentLengthAlgorithm and POST a request that's greater than the
> > default buffer size of 8192 bytes. The request is never processed.
>
> Eric --
>
> I hope to post my design work up on my blog soon
> (http://blogs.sun.com/djhagberg) but I decided to cut out the
> StreamAlgorithm pattern in my application (it seemed like it was more
> tightly coupled to http-type protocols) and went with the lower-level
> ProtocolFilter API.  I have the SSLReadFilter in front of this to decode
> SSL into cleartext bytes...
>
> My Message's have a fixed-length MessageHeader followed by a
> variable-length byte[] content.
>
> There are a few methods here that deal with partial message reads:
>
> - prepareParseBuffer prepends any remaining bytes from the last read
>   onto the current ByteBuffer.
>
> - checkRemainderHeader checks for a previously-read MessageHeader
>
> - clearRemainder does what it says
>
> - preserveRemainder preserves any leftover MessageHeader or partially-
>   read bytes.
>
> I can't say this code has passed all regression/unit tests yet but it
> might help.
>
> Note it also depends on an SSLSession to act as a Map for partial reads.
> If you don't have an SSLSession, you may have something else attached to
> or associated with your SelectionKey that could be used to stow remaining
> stuff...
>
> public class MessageProtocolFilter implements ProtocolFilter {
>     . . . statics, logger, etc. . . .
>     public static final String REMBUF = "REMBUF";
>     public static final Stirng REMHEAD = "REMHEAD";
>
>     /**
>      * Decode raw cleartext bytes into one or more Message's
>      * if possible, and hand off to dispatcher to do real work.
>      */
>     public boolean execute(Context ctx) {
>         WorkerThreadImpl workerThread = (WorkerThreadImpl)Thread.
>                 currentThread();
>         SSLSession session = workerThread.getSSLEngine().getSession();
>         ByteBuffer buf = prepareParseBuffer(workerThread, session);
>
>         // Keep parsing while there is a possibility of decoding msgs
>         int remain;
>         boolean hadError = false;
>         Message m = null;
>         MessageHeader mh = checkRemainderHeader(session);
>         while( (remain=buf.limit()-buf.position()) > 0 ) {
>
>             // Have enough bytes to read a new header?
>             if( mh == null && remain < MessageHeader.SIZE ) {
>                 break;
>
>             // Read a new header if we need it
>             } else if( tmh == null ) {
>                 mh = MessageHeader.readHeader(buf);
>                 // check if un-parseable header
>                 if( !isValidHeader(mh) ) {
>                     log.severe("Invalid msg header: "+mh);
>                     hadError = true;
>                     break;
>                 }
>             }
>
>             // check if remainder is too small for message contents
>             remain = buf.limit()-buf.position();
>             if( remain < mh.getContentLength() ) {
>                 break;
>             }
>
>             // build message with header content bytes, dispatch msg
>             byte[] body =  new byte[mh.getContentLength()];
>             buf.get(body);
>             m = new Message(mh, body);
>             if( !dispatcher.handleMessage(ctx.getSelectionKey(), m) ) {
>                 hadError = true;
>                 break;
>             }
>
>             m = null;
>             mh = null;
>         }
>
>         // If we had an error, need to shut down socket connection
>         if( hadError ) {
>             ctx.setKeyRegistrationState(Context.
>                     KeyRegistrationState.CANCEL);
>             clearRemainder(session);
>         } else {
>             ctx.setKeyRegistrationState(Context.
>                     KeyRegistrationState.REGISTER);
>             preserveRemainder(session, buf, mh);
>         }
>
>         // Ensure the "working" buffer is cleared before returning
>         workerThread.getByteBuffer().clear();
>
>         return !hadError;
>     }
>
>     public boolean postExecute(Context ctx){
>         return true;
>     }
>
>     /**
>      * Check if there are any remaining bytes from a previous read
>      * that just had a partial message header or message content.
>      */
>     private ByteBuffer prepareParseBuffer(WorkerThreadImpl workerThread,
>             SSLSession session) {
>         ByteBuffer bb = workerThread.getByteBuffer();
>         ByteBuffer rem = (ByteBuffer)session.getValue(REMBUF);
>         if( rem != null && rem.position() > 0 ) {
>             // concatenate remainder with new bytes to be parsed
>             bb.flip();
>             int cnt = bb.limit() - bb.position();
>             int space = rem.limit() - rem.position();
>             // Make remainder buffer bigger if necessary
>             if( space < cnt ) {
>                 ByteBuffer rem2 = ByteBuffer.allocate(bb.capacity() +
>                         cnt + 512);
>                 rem.flip();
>                 rem2.put(rem);
>                 rem.clear();
>                 rem = rem2;
>                 session.putValue(REMBUF, rem);
>             }
>             rem.put(bb);
>             bb.clear();
>             bb = rem;
>         }
>         bb.flip();  // prepare for read
>         return bb;
>     }
>
>     /**
>      * Check if last time we were able to decode a header but not
>      * able to read the full message contents.
>      */
>     private MessageHeader checkRemainderHeader(SSLSession session) {
>         MessageHeader mh;
>         mh = (MessageHeader)session.getValue(REMHEAD);
>         if( mh != null ) {
>             session.removeValue(REMHEAD);
>         }
>         return mh;
>     }
>
>     /**
>      * Remove any "remainder" objects from the SSL session,
>      * in preparation for shutting down the connection.
>      */
>     private void clearRemainder(SSLSession session) {
>         session.removeValue(REMHEAD);
>         ByteBuffer rem = (ByteBuffer)session.getValue(REMBUF);
>         if( rem != null ) {
>             rem.clear();
>             session.removeValue(REMBUF);
>         }
>     }
>
>     /**
>      * Preserve any partially-read bytes or header and partial content.
>      */
>     private void preserveRemainder(SSLSession session, ByteBuffer bb,
>             MessageHeader mh) {
>         if( mh == null ) {
>             session.removeValue(REMHEAD);
>         } else {
>             session.putValue(REMHEAD, mh);
>         }
>         int remainCnt = bb.limit()-bb.position();
>         ByteBuffer rem = (ByteBuffer)session.getValue(REMBUF);
>         if( rem != null && rem == bb ) {
>             bb.compact(); // automatically prepares for next append
>         } else if( rem != null && remainCnt == 0 ) {
>             rem.clear();
>         } else if( rem != null ) {
>             if( rem.limit() != rem.capacity() ) {
>                 rem.flip(); // prepare for appending
>             }
>             if( remainCnt > (rem.limit()-rem.position()) ) {
>                 ByteBuffer rem2 = ByteBuffer.allocate(rem.capacity() +
>                         remainCnt + 1024);
>                 rem.flip(); // switch to read mode
>                 rem2.put(rem);
>                 rem.clear();
>                 rem = rem2;
>             }
>             rem.put(bb);
>         } else if( remainCnt > 0 ) {
>             int alloc = 16 * 1024;
>             if( alloc < remainCnt ) {
>                 alloc = remainCnt + 2*1024;
>             }
>             rem = ByteBuffer.allocate(alloc);
>             rem.put(bb);
>         }
>         if( rem != null ) {
>             session.putValue(REMBUF, rem);
>         }
>     }
>
>     . . . More housekeeping methods, logging, etc ...
> }
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> For additional commands, e-mail: [hidden email]
>
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Implementing custom StreamAlgorithm

Jeanfrancois Arcand-2
Hi Eric,

Eric Heaton wrote:
> D.J.,
>
> This is exactly the response I needed! I'll start with the Controller
> from the 'grizzly' framework and implement a custom ProtocolFiler as you
> describe below. I don't need SSL support, so for partial reads, I think
> I'll instead try a custom Pipeline that creates a custom
> WorkerThreadImpl that contains the partially read message header/body.

Let us know the result and if you think your Filter can be donated to
the community, feel free to send it :-)

Thanks!

-- Jeanfrancois


>
> Thanks!
> -Eric
>
> ----- Original Message ----- From: "D. J. Hagberg (Sun)"
> <[hidden email]>
> To: <[hidden email]>
> Sent: Tuesday, June 19, 2007 4:28 PM
> Subject: Re: Implementing custom StreamAlgorithm
>
>
>> Eric Heaton wrote:
>> > I wish to implement a custom StreamAlgorithm whose parse() method can
>> > potentially return false if the incoming request has not been received
>> > completely into the input buffer. (For the curious, I'm trying to
>> > write a custom protocol that handles the Typed Parameter Language
>> > (tpl.sourceforge.net)).
>> >
>> > However, when my StreamAlgorithm's parse() method returns false, the
>> > request is not processed. I tried inspecting the
>> > ContentLengthAlgorithm's parse() method, but found that this does not
>> > seem to work either. I create a simple SelectorThread for the
>> > ContentLengthAlgorithm and POST a request that's greater than the
>> > default buffer size of 8192 bytes. The request is never processed.
>>
>> Eric --
>>
>> I hope to post my design work up on my blog soon
>> (http://blogs.sun.com/djhagberg) but I decided to cut out the
>> StreamAlgorithm pattern in my application (it seemed like it was more
>> tightly coupled to http-type protocols) and went with the lower-level
>> ProtocolFilter API.  I have the SSLReadFilter in front of this to
>> decode SSL into cleartext bytes...
>>
>> My Message's have a fixed-length MessageHeader followed by a
>> variable-length byte[] content.
>>
>> There are a few methods here that deal with partial message reads:
>>
>> - prepareParseBuffer prepends any remaining bytes from the last read
>>   onto the current ByteBuffer.
>>
>> - checkRemainderHeader checks for a previously-read MessageHeader
>>
>> - clearRemainder does what it says
>>
>> - preserveRemainder preserves any leftover MessageHeader or partially-
>>   read bytes.
>>
>> I can't say this code has passed all regression/unit tests yet but it
>> might help.
>>
>> Note it also depends on an SSLSession to act as a Map for partial
>> reads. If you don't have an SSLSession, you may have something else
>> attached to or associated with your SelectionKey that could be used to
>> stow remaining stuff...
>>
>> public class MessageProtocolFilter implements ProtocolFilter {
>>     . . . statics, logger, etc. . . .
>>     public static final String REMBUF = "REMBUF";
>>     public static final Stirng REMHEAD = "REMHEAD";
>>
>>     /**
>>      * Decode raw cleartext bytes into one or more Message's
>>      * if possible, and hand off to dispatcher to do real work.
>>      */
>>     public boolean execute(Context ctx) {
>>         WorkerThreadImpl workerThread = (WorkerThreadImpl)Thread.
>>                 currentThread();
>>         SSLSession session = workerThread.getSSLEngine().getSession();
>>         ByteBuffer buf = prepareParseBuffer(workerThread, session);
>>
>>         // Keep parsing while there is a possibility of decoding msgs
>>         int remain;
>>         boolean hadError = false;
>>         Message m = null;
>>         MessageHeader mh = checkRemainderHeader(session);
>>         while( (remain=buf.limit()-buf.position()) > 0 ) {
>>
>>             // Have enough bytes to read a new header?
>>             if( mh == null && remain < MessageHeader.SIZE ) {
>>                 break;
>>
>>             // Read a new header if we need it
>>             } else if( tmh == null ) {
>>                 mh = MessageHeader.readHeader(buf);
>>                 // check if un-parseable header
>>                 if( !isValidHeader(mh) ) {
>>                     log.severe("Invalid msg header: "+mh);
>>                     hadError = true;
>>                     break;
>>                 }
>>             }
>>
>>             // check if remainder is too small for message contents
>>             remain = buf.limit()-buf.position();
>>             if( remain < mh.getContentLength() ) {
>>                 break;
>>             }
>>
>>             // build message with header content bytes, dispatch msg
>>             byte[] body =  new byte[mh.getContentLength()];
>>             buf.get(body);
>>             m = new Message(mh, body);
>>             if( !dispatcher.handleMessage(ctx.getSelectionKey(), m) ) {
>>                 hadError = true;
>>                 break;
>>             }
>>
>>             m = null;
>>             mh = null;
>>         }
>>
>>         // If we had an error, need to shut down socket connection
>>         if( hadError ) {
>>             ctx.setKeyRegistrationState(Context.
>>                     KeyRegistrationState.CANCEL);
>>             clearRemainder(session);
>>         } else {
>>             ctx.setKeyRegistrationState(Context.
>>                     KeyRegistrationState.REGISTER);
>>             preserveRemainder(session, buf, mh);
>>         }
>>
>>         // Ensure the "working" buffer is cleared before returning
>>         workerThread.getByteBuffer().clear();
>>
>>         return !hadError;
>>     }
>>
>>     public boolean postExecute(Context ctx){
>>         return true;
>>     }
>>
>>     /**
>>      * Check if there are any remaining bytes from a previous read
>>      * that just had a partial message header or message content.
>>      */
>>     private ByteBuffer prepareParseBuffer(WorkerThreadImpl workerThread,
>>             SSLSession session) {
>>         ByteBuffer bb = workerThread.getByteBuffer();
>>         ByteBuffer rem = (ByteBuffer)session.getValue(REMBUF);
>>         if( rem != null && rem.position() > 0 ) {
>>             // concatenate remainder with new bytes to be parsed
>>             bb.flip();
>>             int cnt = bb.limit() - bb.position();
>>             int space = rem.limit() - rem.position();
>>             // Make remainder buffer bigger if necessary
>>             if( space < cnt ) {
>>                 ByteBuffer rem2 = ByteBuffer.allocate(bb.capacity() +
>>                         cnt + 512);
>>                 rem.flip();
>>                 rem2.put(rem);
>>                 rem.clear();
>>                 rem = rem2;
>>                 session.putValue(REMBUF, rem);
>>             }
>>             rem.put(bb);
>>             bb.clear();
>>             bb = rem;
>>         }
>>         bb.flip();  // prepare for read
>>         return bb;
>>     }
>>
>>     /**
>>      * Check if last time we were able to decode a header but not
>>      * able to read the full message contents.
>>      */
>>     private MessageHeader checkRemainderHeader(SSLSession session) {
>>         MessageHeader mh;
>>         mh = (MessageHeader)session.getValue(REMHEAD);
>>         if( mh != null ) {
>>             session.removeValue(REMHEAD);
>>         }
>>         return mh;
>>     }
>>
>>     /**
>>      * Remove any "remainder" objects from the SSL session,
>>      * in preparation for shutting down the connection.
>>      */
>>     private void clearRemainder(SSLSession session) {
>>         session.removeValue(REMHEAD);
>>         ByteBuffer rem = (ByteBuffer)session.getValue(REMBUF);
>>         if( rem != null ) {
>>             rem.clear();
>>             session.removeValue(REMBUF);
>>         }
>>     }
>>
>>     /**
>>      * Preserve any partially-read bytes or header and partial content.
>>      */
>>     private void preserveRemainder(SSLSession session, ByteBuffer bb,
>>             MessageHeader mh) {
>>         if( mh == null ) {
>>             session.removeValue(REMHEAD);
>>         } else {
>>             session.putValue(REMHEAD, mh);
>>         }
>>         int remainCnt = bb.limit()-bb.position();
>>         ByteBuffer rem = (ByteBuffer)session.getValue(REMBUF);
>>         if( rem != null && rem == bb ) {
>>             bb.compact(); // automatically prepares for next append
>>         } else if( rem != null && remainCnt == 0 ) {
>>             rem.clear();
>>         } else if( rem != null ) {
>>             if( rem.limit() != rem.capacity() ) {
>>                 rem.flip(); // prepare for appending
>>             }
>>             if( remainCnt > (rem.limit()-rem.position()) ) {
>>                 ByteBuffer rem2 = ByteBuffer.allocate(rem.capacity() +
>>                         remainCnt + 1024);
>>                 rem.flip(); // switch to read mode
>>                 rem2.put(rem);
>>                 rem.clear();
>>                 rem = rem2;
>>             }
>>             rem.put(bb);
>>         } else if( remainCnt > 0 ) {
>>             int alloc = 16 * 1024;
>>             if( alloc < remainCnt ) {
>>                 alloc = remainCnt + 2*1024;
>>             }
>>             rem = ByteBuffer.allocate(alloc);
>>             rem.put(bb);
>>         }
>>         if( rem != null ) {
>>             session.putValue(REMBUF, rem);
>>         }
>>     }
>>
>>     . . . More housekeeping methods, logging, etc ...
>> }
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: [hidden email]
>> For additional commands, e-mail: [hidden email]
>>
>>
>>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> For additional commands, e-mail: [hidden email]
>

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Implementing custom StreamAlgorithm

Eric Heaton
Jeanfrancois,

As it turns out, I decided to follow D.J.'s suggestion and instead store my
partially-read message as an attachment on the SelectionKey. This didn't
work initially, because the standard cleartext ReadFilter that I was using
earlier in the ProtocolChain was doing a key.attach(null) first thing in its
execute() method. So, I replaced this ReadFilter with another version of a
"read" filter that didn't remove the SelectionKey attachment, and things are
working just peachy now.

I don't know if my ProtocolFilter is ready for prime-time yet, since I've
got all of my protocol-specific parsing code inside the MesasgeFilter. Other
than that, it looks very much like D.J.'s implementation (thanks again,
D.J.!). The ideal thing might be to extract all of the protocol-specific
parsing into a MessageParser interface that could be injected into the
MessageFilter... I'll try and think about this after my deadline.

I wonder if the "correct" thing for me to be doing is to store state across
calls to execute() in the SelectionKey? My initial idea to use a custom
WorkerThreadImpl that held on to the state wouldn't work, right, because the
WorkerThreads are not assigned to a specific SelectionKey (or are they?).

-Eric

----- Original Message -----
From: "Jeanfrancois Arcand" <[hidden email]>
To: <[hidden email]>
Sent: Wednesday, June 20, 2007 8:43 AM
Subject: Re: Implementing custom StreamAlgorithm


> Hi Eric,
>
> Eric Heaton wrote:
>> D.J.,
>>
>> This is exactly the response I needed! I'll start with the Controller
>> from the 'grizzly' framework and implement a custom ProtocolFiler as you
>> describe below. I don't need SSL support, so for partial reads, I think
>> I'll instead try a custom Pipeline that creates a custom WorkerThreadImpl
>> that contains the partially read message header/body.
>
> Let us know the result and if you think your Filter can be donated to the
> community, feel free to send it :-)
>
> Thanks!
>
> -- Jeanfrancois
>
>
>>
>> Thanks!
>> -Eric
>>
>> ----- Original Message ----- From: "D. J. Hagberg (Sun)"
>> <[hidden email]>
>> To: <[hidden email]>
>> Sent: Tuesday, June 19, 2007 4:28 PM
>> Subject: Re: Implementing custom StreamAlgorithm
>>
>>
>>> Eric Heaton wrote:
>>> > I wish to implement a custom StreamAlgorithm whose parse() method can
>>> > potentially return false if the incoming request has not been received
>>> > completely into the input buffer. (For the curious, I'm trying to
>>> > write a custom protocol that handles the Typed Parameter Language
>>> > (tpl.sourceforge.net)).
>>> >
>>> > However, when my StreamAlgorithm's parse() method returns false, the
>>> > request is not processed. I tried inspecting the
>>> > ContentLengthAlgorithm's parse() method, but found that this does not
>>> > seem to work either. I create a simple SelectorThread for the
>>> > ContentLengthAlgorithm and POST a request that's greater than the
>>> > default buffer size of 8192 bytes. The request is never processed.
>>>
>>> Eric --
>>>
>>> I hope to post my design work up on my blog soon
>>> (http://blogs.sun.com/djhagberg) but I decided to cut out the
>>> StreamAlgorithm pattern in my application (it seemed like it was more
>>> tightly coupled to http-type protocols) and went with the lower-level
>>> ProtocolFilter API.  I have the SSLReadFilter in front of this to decode
>>> SSL into cleartext bytes...
>>>
>>> My Message's have a fixed-length MessageHeader followed by a
>>> variable-length byte[] content.
>>>
>>> There are a few methods here that deal with partial message reads:
>>>
>>> - prepareParseBuffer prepends any remaining bytes from the last read
>>>   onto the current ByteBuffer.
>>>
>>> - checkRemainderHeader checks for a previously-read MessageHeader
>>>
>>> - clearRemainder does what it says
>>>
>>> - preserveRemainder preserves any leftover MessageHeader or partially-
>>>   read bytes.
>>>
>>> I can't say this code has passed all regression/unit tests yet but it
>>> might help.
>>>
>>> Note it also depends on an SSLSession to act as a Map for partial reads.
>>> If you don't have an SSLSession, you may have something else attached to
>>> or associated with your SelectionKey that could be used to stow
>>> remaining stuff...
>>>
>>> public class MessageProtocolFilter implements ProtocolFilter {
>>>     . . . statics, logger, etc. . . .
>>>     public static final String REMBUF = "REMBUF";
>>>     public static final Stirng REMHEAD = "REMHEAD";
>>>
>>>     /**
>>>      * Decode raw cleartext bytes into one or more Message's
>>>      * if possible, and hand off to dispatcher to do real work.
>>>      */
>>>     public boolean execute(Context ctx) {
>>>         WorkerThreadImpl workerThread = (WorkerThreadImpl)Thread.
>>>                 currentThread();
>>>         SSLSession session = workerThread.getSSLEngine().getSession();
>>>         ByteBuffer buf = prepareParseBuffer(workerThread, session);
>>>
>>>         // Keep parsing while there is a possibility of decoding msgs
>>>         int remain;
>>>         boolean hadError = false;
>>>         Message m = null;
>>>         MessageHeader mh = checkRemainderHeader(session);
>>>         while( (remain=buf.limit()-buf.position()) > 0 ) {
>>>
>>>             // Have enough bytes to read a new header?
>>>             if( mh == null && remain < MessageHeader.SIZE ) {
>>>                 break;
>>>
>>>             // Read a new header if we need it
>>>             } else if( tmh == null ) {
>>>                 mh = MessageHeader.readHeader(buf);
>>>                 // check if un-parseable header
>>>                 if( !isValidHeader(mh) ) {
>>>                     log.severe("Invalid msg header: "+mh);
>>>                     hadError = true;
>>>                     break;
>>>                 }
>>>             }
>>>
>>>             // check if remainder is too small for message contents
>>>             remain = buf.limit()-buf.position();
>>>             if( remain < mh.getContentLength() ) {
>>>                 break;
>>>             }
>>>
>>>             // build message with header content bytes, dispatch msg
>>>             byte[] body =  new byte[mh.getContentLength()];
>>>             buf.get(body);
>>>             m = new Message(mh, body);
>>>             if( !dispatcher.handleMessage(ctx.getSelectionKey(), m) ) {
>>>                 hadError = true;
>>>                 break;
>>>             }
>>>
>>>             m = null;
>>>             mh = null;
>>>         }
>>>
>>>         // If we had an error, need to shut down socket connection
>>>         if( hadError ) {
>>>             ctx.setKeyRegistrationState(Context.
>>>                     KeyRegistrationState.CANCEL);
>>>             clearRemainder(session);
>>>         } else {
>>>             ctx.setKeyRegistrationState(Context.
>>>                     KeyRegistrationState.REGISTER);
>>>             preserveRemainder(session, buf, mh);
>>>         }
>>>
>>>         // Ensure the "working" buffer is cleared before returning
>>>         workerThread.getByteBuffer().clear();
>>>
>>>         return !hadError;
>>>     }
>>>
>>>     public boolean postExecute(Context ctx){
>>>         return true;
>>>     }
>>>
>>>     /**
>>>      * Check if there are any remaining bytes from a previous read
>>>      * that just had a partial message header or message content.
>>>      */
>>>     private ByteBuffer prepareParseBuffer(WorkerThreadImpl workerThread,
>>>             SSLSession session) {
>>>         ByteBuffer bb = workerThread.getByteBuffer();
>>>         ByteBuffer rem = (ByteBuffer)session.getValue(REMBUF);
>>>         if( rem != null && rem.position() > 0 ) {
>>>             // concatenate remainder with new bytes to be parsed
>>>             bb.flip();
>>>             int cnt = bb.limit() - bb.position();
>>>             int space = rem.limit() - rem.position();
>>>             // Make remainder buffer bigger if necessary
>>>             if( space < cnt ) {
>>>                 ByteBuffer rem2 = ByteBuffer.allocate(bb.capacity() +
>>>                         cnt + 512);
>>>                 rem.flip();
>>>                 rem2.put(rem);
>>>                 rem.clear();
>>>                 rem = rem2;
>>>                 session.putValue(REMBUF, rem);
>>>             }
>>>             rem.put(bb);
>>>             bb.clear();
>>>             bb = rem;
>>>         }
>>>         bb.flip();  // prepare for read
>>>         return bb;
>>>     }
>>>
>>>     /**
>>>      * Check if last time we were able to decode a header but not
>>>      * able to read the full message contents.
>>>      */
>>>     private MessageHeader checkRemainderHeader(SSLSession session) {
>>>         MessageHeader mh;
>>>         mh = (MessageHeader)session.getValue(REMHEAD);
>>>         if( mh != null ) {
>>>             session.removeValue(REMHEAD);
>>>         }
>>>         return mh;
>>>     }
>>>
>>>     /**
>>>      * Remove any "remainder" objects from the SSL session,
>>>      * in preparation for shutting down the connection.
>>>      */
>>>     private void clearRemainder(SSLSession session) {
>>>         session.removeValue(REMHEAD);
>>>         ByteBuffer rem = (ByteBuffer)session.getValue(REMBUF);
>>>         if( rem != null ) {
>>>             rem.clear();
>>>             session.removeValue(REMBUF);
>>>         }
>>>     }
>>>
>>>     /**
>>>      * Preserve any partially-read bytes or header and partial content.
>>>      */
>>>     private void preserveRemainder(SSLSession session, ByteBuffer bb,
>>>             MessageHeader mh) {
>>>         if( mh == null ) {
>>>             session.removeValue(REMHEAD);
>>>         } else {
>>>             session.putValue(REMHEAD, mh);
>>>         }
>>>         int remainCnt = bb.limit()-bb.position();
>>>         ByteBuffer rem = (ByteBuffer)session.getValue(REMBUF);
>>>         if( rem != null && rem == bb ) {
>>>             bb.compact(); // automatically prepares for next append
>>>         } else if( rem != null && remainCnt == 0 ) {
>>>             rem.clear();
>>>         } else if( rem != null ) {
>>>             if( rem.limit() != rem.capacity() ) {
>>>                 rem.flip(); // prepare for appending
>>>             }
>>>             if( remainCnt > (rem.limit()-rem.position()) ) {
>>>                 ByteBuffer rem2 = ByteBuffer.allocate(rem.capacity() +
>>>                         remainCnt + 1024);
>>>                 rem.flip(); // switch to read mode
>>>                 rem2.put(rem);
>>>                 rem.clear();
>>>                 rem = rem2;
>>>             }
>>>             rem.put(bb);
>>>         } else if( remainCnt > 0 ) {
>>>             int alloc = 16 * 1024;
>>>             if( alloc < remainCnt ) {
>>>                 alloc = remainCnt + 2*1024;
>>>             }
>>>             rem = ByteBuffer.allocate(alloc);
>>>             rem.put(bb);
>>>         }
>>>         if( rem != null ) {
>>>             session.putValue(REMBUF, rem);
>>>         }
>>>     }
>>>
>>>     . . . More housekeeping methods, logging, etc ...
>>> }
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: [hidden email]
>>> For additional commands, e-mail: [hidden email]
>>>
>>>
>>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: [hidden email]
>> For additional commands, e-mail: [hidden email]
>>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> For additional commands, e-mail: [hidden email]
>
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Implementing custom StreamAlgorithm

Jeanfrancois Arcand-2
Hi Eric

Eric Heaton wrote:

> Jeanfrancois,
>
> As it turns out, I decided to follow D.J.'s suggestion and instead store
> my partially-read message as an attachment on the SelectionKey. This
> didn't work initially, because the standard cleartext ReadFilter that I
> was using earlier in the ProtocolChain was doing a key.attach(null)
> first thing in its execute() method. So, I replaced this ReadFilter with
> another version of a "read" filter that didn't remove the SelectionKey
> attachment, and things are working just peachy now.
>
> I don't know if my ProtocolFilter is ready for prime-time yet, since
> I've got all of my protocol-specific parsing code inside the
> MesasgeFilter. Other than that, it looks very much like D.J.'s
> implementation (thanks again, D.J.!). The ideal thing might be to
> extract all of the protocol-specific parsing into a MessageParser
> interface that could be injected into the MessageFilter... I'll try and
> think about this after my deadline.
>
> I wonder if the "correct" thing for me to be doing is to store state
> across calls to execute() in the SelectionKey?

Yes it is, but make sure the object attached are removed when the key is
canceled, to avoid potential memory leak:

http://weblogs.java.net/blog/jfarcand/archive/2006/06/tricks_and_tips.html

  My initial idea to use a
> custom WorkerThreadImpl that held on to the state wouldn't work, right,
> because the WorkerThreads are not assigned to a specific SelectionKey
> (or are they?).

Yes that's a good idea. I will propose a generic way today to handle
partial read and attachment to SelectionKey. If all goes well I should
have it be the end of the day so you should be able to try it. The
SailFin project is asking for weeks about that feature :-)

Thanks for the feedback!

-- Jeanfrancois


>
> -Eric
>
> ----- Original Message ----- From: "Jeanfrancois Arcand"
> <[hidden email]>
> To: <[hidden email]>
> Sent: Wednesday, June 20, 2007 8:43 AM
> Subject: Re: Implementing custom StreamAlgorithm
>
>
>> Hi Eric,
>>
>> Eric Heaton wrote:
>>> D.J.,
>>>
>>> This is exactly the response I needed! I'll start with the Controller
>>> from the 'grizzly' framework and implement a custom ProtocolFiler as
>>> you describe below. I don't need SSL support, so for partial reads, I
>>> think I'll instead try a custom Pipeline that creates a custom
>>> WorkerThreadImpl that contains the partially read message header/body.
>>
>> Let us know the result and if you think your Filter can be donated to
>> the community, feel free to send it :-)
>>
>> Thanks!
>>
>> -- Jeanfrancois
>>
>>
>>>
>>> Thanks!
>>> -Eric
>>>
>>> ----- Original Message ----- From: "D. J. Hagberg (Sun)"
>>> <[hidden email]>
>>> To: <[hidden email]>
>>> Sent: Tuesday, June 19, 2007 4:28 PM
>>> Subject: Re: Implementing custom StreamAlgorithm
>>>
>>>
>>>> Eric Heaton wrote:
>>>> > I wish to implement a custom StreamAlgorithm whose parse() method can
>>>> > potentially return false if the incoming request has not been
>>>> received
>>>> > completely into the input buffer. (For the curious, I'm trying to
>>>> > write a custom protocol that handles the Typed Parameter Language
>>>> > (tpl.sourceforge.net)).
>>>> >
>>>> > However, when my StreamAlgorithm's parse() method returns false, the
>>>> > request is not processed. I tried inspecting the
>>>> > ContentLengthAlgorithm's parse() method, but found that this does not
>>>> > seem to work either. I create a simple SelectorThread for the
>>>> > ContentLengthAlgorithm and POST a request that's greater than the
>>>> > default buffer size of 8192 bytes. The request is never processed.
>>>>
>>>> Eric --
>>>>
>>>> I hope to post my design work up on my blog soon
>>>> (http://blogs.sun.com/djhagberg) but I decided to cut out the
>>>> StreamAlgorithm pattern in my application (it seemed like it was
>>>> more tightly coupled to http-type protocols) and went with the
>>>> lower-level ProtocolFilter API.  I have the SSLReadFilter in front
>>>> of this to decode SSL into cleartext bytes...
>>>>
>>>> My Message's have a fixed-length MessageHeader followed by a
>>>> variable-length byte[] content.
>>>>
>>>> There are a few methods here that deal with partial message reads:
>>>>
>>>> - prepareParseBuffer prepends any remaining bytes from the last read
>>>>   onto the current ByteBuffer.
>>>>
>>>> - checkRemainderHeader checks for a previously-read MessageHeader
>>>>
>>>> - clearRemainder does what it says
>>>>
>>>> - preserveRemainder preserves any leftover MessageHeader or partially-
>>>>   read bytes.
>>>>
>>>> I can't say this code has passed all regression/unit tests yet but
>>>> it might help.
>>>>
>>>> Note it also depends on an SSLSession to act as a Map for partial
>>>> reads. If you don't have an SSLSession, you may have something else
>>>> attached to or associated with your SelectionKey that could be used
>>>> to stow remaining stuff...
>>>>
>>>> public class MessageProtocolFilter implements ProtocolFilter {
>>>>     . . . statics, logger, etc. . . .
>>>>     public static final String REMBUF = "REMBUF";
>>>>     public static final Stirng REMHEAD = "REMHEAD";
>>>>
>>>>     /**
>>>>      * Decode raw cleartext bytes into one or more Message's
>>>>      * if possible, and hand off to dispatcher to do real work.
>>>>      */
>>>>     public boolean execute(Context ctx) {
>>>>         WorkerThreadImpl workerThread = (WorkerThreadImpl)Thread.
>>>>                 currentThread();
>>>>         SSLSession session = workerThread.getSSLEngine().getSession();
>>>>         ByteBuffer buf = prepareParseBuffer(workerThread, session);
>>>>
>>>>         // Keep parsing while there is a possibility of decoding msgs
>>>>         int remain;
>>>>         boolean hadError = false;
>>>>         Message m = null;
>>>>         MessageHeader mh = checkRemainderHeader(session);
>>>>         while( (remain=buf.limit()-buf.position()) > 0 ) {
>>>>
>>>>             // Have enough bytes to read a new header?
>>>>             if( mh == null && remain < MessageHeader.SIZE ) {
>>>>                 break;
>>>>
>>>>             // Read a new header if we need it
>>>>             } else if( tmh == null ) {
>>>>                 mh = MessageHeader.readHeader(buf);
>>>>                 // check if un-parseable header
>>>>                 if( !isValidHeader(mh) ) {
>>>>                     log.severe("Invalid msg header: "+mh);
>>>>                     hadError = true;
>>>>                     break;
>>>>                 }
>>>>             }
>>>>
>>>>             // check if remainder is too small for message contents
>>>>             remain = buf.limit()-buf.position();
>>>>             if( remain < mh.getContentLength() ) {
>>>>                 break;
>>>>             }
>>>>
>>>>             // build message with header content bytes, dispatch msg
>>>>             byte[] body =  new byte[mh.getContentLength()];
>>>>             buf.get(body);
>>>>             m = new Message(mh, body);
>>>>             if( !dispatcher.handleMessage(ctx.getSelectionKey(), m) ) {
>>>>                 hadError = true;
>>>>                 break;
>>>>             }
>>>>
>>>>             m = null;
>>>>             mh = null;
>>>>         }
>>>>
>>>>         // If we had an error, need to shut down socket connection
>>>>         if( hadError ) {
>>>>             ctx.setKeyRegistrationState(Context.
>>>>                     KeyRegistrationState.CANCEL);
>>>>             clearRemainder(session);
>>>>         } else {
>>>>             ctx.setKeyRegistrationState(Context.
>>>>                     KeyRegistrationState.REGISTER);
>>>>             preserveRemainder(session, buf, mh);
>>>>         }
>>>>
>>>>         // Ensure the "working" buffer is cleared before returning
>>>>         workerThread.getByteBuffer().clear();
>>>>
>>>>         return !hadError;
>>>>     }
>>>>
>>>>     public boolean postExecute(Context ctx){
>>>>         return true;
>>>>     }
>>>>
>>>>     /**
>>>>      * Check if there are any remaining bytes from a previous read
>>>>      * that just had a partial message header or message content.
>>>>      */
>>>>     private ByteBuffer prepareParseBuffer(WorkerThreadImpl
>>>> workerThread,
>>>>             SSLSession session) {
>>>>         ByteBuffer bb = workerThread.getByteBuffer();
>>>>         ByteBuffer rem = (ByteBuffer)session.getValue(REMBUF);
>>>>         if( rem != null && rem.position() > 0 ) {
>>>>             // concatenate remainder with new bytes to be parsed
>>>>             bb.flip();
>>>>             int cnt = bb.limit() - bb.position();
>>>>             int space = rem.limit() - rem.position();
>>>>             // Make remainder buffer bigger if necessary
>>>>             if( space < cnt ) {
>>>>                 ByteBuffer rem2 = ByteBuffer.allocate(bb.capacity() +
>>>>                         cnt + 512);
>>>>                 rem.flip();
>>>>                 rem2.put(rem);
>>>>                 rem.clear();
>>>>                 rem = rem2;
>>>>                 session.putValue(REMBUF, rem);
>>>>             }
>>>>             rem.put(bb);
>>>>             bb.clear();
>>>>             bb = rem;
>>>>         }
>>>>         bb.flip();  // prepare for read
>>>>         return bb;
>>>>     }
>>>>
>>>>     /**
>>>>      * Check if last time we were able to decode a header but not
>>>>      * able to read the full message contents.
>>>>      */
>>>>     private MessageHeader checkRemainderHeader(SSLSession session) {
>>>>         MessageHeader mh;
>>>>         mh = (MessageHeader)session.getValue(REMHEAD);
>>>>         if( mh != null ) {
>>>>             session.removeValue(REMHEAD);
>>>>         }
>>>>         return mh;
>>>>     }
>>>>
>>>>     /**
>>>>      * Remove any "remainder" objects from the SSL session,
>>>>      * in preparation for shutting down the connection.
>>>>      */
>>>>     private void clearRemainder(SSLSession session) {
>>>>         session.removeValue(REMHEAD);
>>>>         ByteBuffer rem = (ByteBuffer)session.getValue(REMBUF);
>>>>         if( rem != null ) {
>>>>             rem.clear();
>>>>             session.removeValue(REMBUF);
>>>>         }
>>>>     }
>>>>
>>>>     /**
>>>>      * Preserve any partially-read bytes or header and partial content.
>>>>      */
>>>>     private void preserveRemainder(SSLSession session, ByteBuffer bb,
>>>>             MessageHeader mh) {
>>>>         if( mh == null ) {
>>>>             session.removeValue(REMHEAD);
>>>>         } else {
>>>>             session.putValue(REMHEAD, mh);
>>>>         }
>>>>         int remainCnt = bb.limit()-bb.position();
>>>>         ByteBuffer rem = (ByteBuffer)session.getValue(REMBUF);
>>>>         if( rem != null && rem == bb ) {
>>>>             bb.compact(); // automatically prepares for next append
>>>>         } else if( rem != null && remainCnt == 0 ) {
>>>>             rem.clear();
>>>>         } else if( rem != null ) {
>>>>             if( rem.limit() != rem.capacity() ) {
>>>>                 rem.flip(); // prepare for appending
>>>>             }
>>>>             if( remainCnt > (rem.limit()-rem.position()) ) {
>>>>                 ByteBuffer rem2 = ByteBuffer.allocate(rem.capacity() +
>>>>                         remainCnt + 1024);
>>>>                 rem.flip(); // switch to read mode
>>>>                 rem2.put(rem);
>>>>                 rem.clear();
>>>>                 rem = rem2;
>>>>             }
>>>>             rem.put(bb);
>>>>         } else if( remainCnt > 0 ) {
>>>>             int alloc = 16 * 1024;
>>>>             if( alloc < remainCnt ) {
>>>>                 alloc = remainCnt + 2*1024;
>>>>             }
>>>>             rem = ByteBuffer.allocate(alloc);
>>>>             rem.put(bb);
>>>>         }
>>>>         if( rem != null ) {
>>>>             session.putValue(REMBUF, rem);
>>>>         }
>>>>     }
>>>>
>>>>     . . . More housekeeping methods, logging, etc ...
>>>> }
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: [hidden email]
>>>> For additional commands, e-mail: [hidden email]
>>>>
>>>>
>>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: [hidden email]
>>> For additional commands, e-mail: [hidden email]
>>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: [hidden email]
>> For additional commands, e-mail: [hidden email]
>>
>>
>>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [hidden email]
> For additional commands, e-mail: [hidden email]
>

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]