Bug 61282 - org.apache.catalina.connector.CoyoteInputStream asynchronous read
Summary: org.apache.catalina.connector.CoyoteInputStream asynchronous read
Status: RESOLVED INVALID
Alias: None
Product: Tomcat 9
Classification: Unclassified
Component: Catalina (show other bugs)
Version: 9.0.0.M22
Hardware: PC Linux
: P2 normal (vote)
Target Milestone: -----
Assignee: Tomcat Developers Mailing List
URL:
Keywords:
Depends on:
Blocks:
 
Reported: 2017-07-11 15:08 UTC by Jack
Modified: 2017-07-12 08:08 UTC (History)
0 users



Attachments

Note You need to log in before you can comment on or make changes to this bug.
Description Jack 2017-07-11 15:08:00 UTC
the byte array
byte[] ab=new byte[4096];
int len=is.read(ab,0,ab.length);

After onDataAvailable returns to container, the content of ab is changed if len==4096, the content of ab is not changed if len!=4096.


The following is the java class I used to do the test. I issue is explained in the onComplete method.
===========================


package zede.consult.webapp;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.ReadListener;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import zede.util.Util;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.json.Json;
import javax.json.stream.JsonGenerator;
import javax.servlet.AsyncListener;

public class BAtest extends HttpServlet {

    static String dirUpload;
    public static String dirTmp, dirTarget, dirTargetPost;

    @Override
    protected void doGet(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {

        StringWriter sw = new StringWriter();
        try {
            JsonGenerator g = Json.createGenerator(sw); //out
            g.writeStartObject().write("idReq", 0);
            if (WebApp.writeReason(g, null)) { //no more information
            }
            g.writeEnd();
            g.flush();
        } catch (Throwable t) {
            if (WebApp.debug) {
                t.printStackTrace(WebApp.out);
                WebApp.out.flush();
            }
            try {
                sw = new StringWriter();
                JsonGenerator g = Json.createGenerator(sw); //out
                g.writeStartObject().write("idReq", 0);
                if (WebApp.writeReason(g, null)) { //no more information
                }
                g.writeEnd();
                g.flush();
            } catch (Throwable t2) {
                return;
            }
        }
        try {
            WebApp.sendRes_json(sw.toString(), response);
            //os.close(); we do not open it.
            //PrintWriter out=response.getWriter();
            //out.print(json);out.flush();
        } catch (Throwable ex) {
            //just swallow it //Logger.getLogger(BA.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    @Override
    protected void doPut(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {
        AsyncContext ac = request.startAsync();
        ACHandler_Put h = new ACHandler_Put(ac, request, response);
        ac.addListener(h);
        ac.start(h);
    }

    class ACHandler_Put implements ReadListener, AsyncListener, Runnable {

        ServletInputStream is;
        int bytesReceived, bytesSaved; //this is different from ut.saved which is got from harddisk, this is instructed to save.
        int saved, size; //int for length is more than enough.
        ConcurrentLinkedQueue<DataPiece> q2save = new ConcurrentLinkedQueue<>();
        public final AsyncContext ac;
        public final HttpServletRequest request;
        public final HttpServletResponse response;
        public int idReq;
        public Ex_Coded4Web exweb;

        public ACHandler_Put(AsyncContext ac, HttpServletRequest request, HttpServletResponse response) {
            this.ac = ac;
            this.request = request;
            this.response = response;
        }

        @Override
        public void run() {
            try {
                String slength = request.getHeader("Content-Length");
                if (slength == null) {
                    //in this case, because the fault of the client's agent, we should reject it,
                    //but since we are nice, so we accept it, but the length is the length specified in the UpLoadTask
                } else {
                    try {
                        size = Integer.parseInt(slength);
                    } catch (Throwable t) {
                        throw new Ex_Coded4Web(WebApp.Reason_ContentLengthParseFailed, "Content_Length:" + slength + " can not be converted into int");
                    }
                }
                startRead();
                return; //we do not call complete at this time point
            } catch (Ex_Coded4Web e) {
                exweb = e;
            } catch (Throwable t) {
                t.printStackTrace();
                exweb = new Ex_Coded4Web(WebApp.Reason_UncaughtException, t.getMessage());
            }
            complete();
        }

        void startRead() {
            File f = new File("stream.out");
//            System.out.println(f.getAbsolutePath());
//tomcat home or catalina home
            f = new File("/home/jack/Pictures/stream.out");
            try {
                fos = new FileOutputStream(f);
            } catch (FileNotFoundException ex) {
            }

            bytesReceived = bytesSaved = saved;
            try {
                is = request.getInputStream();
//System.out.println(is.getClass().getName());
//org.apache.catalina.connector.CoyoteInputStream
            } catch (IOException ex) {
                exweb = new Ex_Coded4Web(WebApp.Reason_FailedOnReceiving, "unable get uploading stream:" + ex.getMessage());
                complete();
                return;
            }
            //is=java.util.Base64.getDecoder().wrap(is);
            is.setReadListener(ACHandler_Put.this);
        }

        @Override
        public void onDataAvailable() {
            {
                int len; //we have to check the bytes saved incase someone attack us.
                byte[] ab = null;//q4096.poll();
                if (ab == null) {
                    ab = new byte[4096];
                }
                String msg;
                while (true) {
                    try {
                        if (!is.isReady()) { //this can throw the same EOFException as read.
                            break;
                        }
                        len = is.read(ab, 0, ab.length);
                    } catch (IOException ex) { //read exception
                        //this happens usually as a result of user refresh the webpage before uploading completes.
                        ex.printStackTrace();
                        msg = "isread/read:" + ex.getMessage();
                        exweb = new Ex_Coded4Web(WebApp.Reason_FailedOnReceiving, msg);
                        complete();
                        return;
                    }
                    if (-1 == len) {
                        //this will not happen, or I did not see it happen
                        //onAllDataRead will be called.
                        //if this happens, then this should have the same effect as onAllDataRead()
                        if (WebApp.debug) {
                            System.out.println("put len=-1");

                        }
                        return;
                    }
//                if(len==0){ //will never happen, otherwise isready is useless
//                    break;
//                }
                    DataPiece dp = new DataPiece(ab, bytesReceived, len); //new DataPiece(ByteBuffer.wrap(ab, 0, len), bytesReceived);
                    bytesReceived += len;
                    if (bytesReceived > size) {
                        msg = "size " + bytesReceived + " is exceeding the inited length:" + size;
                        System.out.println(msg);
                        exweb = new Ex_Coded4Web(WebApp.Reason_SizeExceedDeclaredLength, msg);
                        try {
                            is.close();
                        } catch (Throwable t) { //exweb !=null, so just swallow
                        }
                        complete();
                        return;
                    }
                    //System.out.println("read data +" + len + "=" + bytesReceived + "/" + ut.size + " saved:" + bytesSaved);
                    //I would do this in asynchronous mode, but to show the data is correctly received,
                    //so we save it with an outputstream, and we can compre the save file and original file
                    //they are same.
                    if (fos != null) {
                        try {
                            fos.write(ab, 0, len);
                        } catch (IOException ex) {
                        }
                    }
                    q2save.offer(dp);
                }
            }
        }
        FileOutputStream fos;
        MappedByteBuffer bb = null;
        byte[] bbO;
        int offset;

        /**
         * In the current servlet specification, I do not know whether read
         * returns -1 will happen or not. Since this notification should has the
         * same effect as read returns -1.
         *
         */
        @Override
        public void onAllDataRead() {
            if (WebApp.debug) {
                System.out.println("BA.put onAllDataRead, bytesReceived:" + bytesReceived + ", size:" + size);
            }
            try {
                //save();
                if (bytesReceived < size) {
                    exweb = new Ex_Coded4Web(WebApp.Reason_SizeLessThanDeclaredLength, "size " + bytesReceived + " is less than the inited length:" + size);
                }
            } catch (Throwable t) {
                if (WebApp.debug) {
                    t.printStackTrace();

                }
            }
            if (fos != null) {
                try {
                    fos.flush();
                    fos.close();
                } catch (IOException ex) {
                }
            }
            complete();
        }

        //of reading from network
        @Override
        public void onError(Throwable t) {
            try {
                //one case, when uploading is not finished, browser crashed, or refreshed, so the uploading is canceled.
                System.out.println("onError when receiving data from network");
                t.printStackTrace();

                exweb = new Ex_Coded4Web(WebApp.Reason_FailedOnReceiving, "onError when receiving:" + t.getMessage());
                //we do not flush here, since q2save might not be empty yet
                //we just call save instead
                complete(); //but why onComplete is not called as expected??
            } catch (Throwable t2) {
                System.out.println("unexpected inside onError");
                t2.printStackTrace();

            }
        }

        //servlet asyncContext complete, onError will also call this?
        /**
         * we have two copies of received data: DataPiece.ab and DataPiece.ab2,
         * we compare them, and we compare them to the original data.
         * 
         * we can find that the copy returned by ServletInputStream.read is changed
         * for the len=4096, if the len is not 4096, they are not changed.
         * 
         * 
         * @param event 
         */
        @Override
        public void onComplete(AsyncEvent event) {
            try { //this is the original file
                File f = new File("/home/jack/Pictures/a.pdf");
                FileChannel fc = new FileInputStream(f).getChannel();
                bb = fc.map(FileChannel.MapMode.READ_ONLY, 0, f.length());
                /*
                if (bb.hasArray()) {
                    System.out.println("hasArray:yes!");
                    bbO = bb.array();
                    offset = bb.arrayOffset();
                } else {
                    System.out.println("hasArray:No!");
                }
                */
            } catch (IOException ex) {
                ex.printStackTrace();
            }
            Iterator<DataPiece> I = q2save.iterator();
            while (I.hasNext()) {
                DataPiece dp = I.next();
                bb.position(dp.offset);
                ByteBuffer bb1 = ByteBuffer.wrap(dp.ab, 0, dp.len); //or dp.ab2
                boolean e1 = Util.equals(bb1, bb);
                String msg = "bytesSaved:" + bytesSaved + "+" + dp.len + " bb1:" + e1;
                bb.position(dp.offset);
                ByteBuffer bb2 = ByteBuffer.wrap(dp.ab2, 0, dp.len); //or dp.ab
                msg += " bb2:" + Util.equals(bb2, bb) + " ab:ab2" + Util.equals(dp.ab, 0, dp.len, dp.ab2, 0);
                System.out.println(msg);
/*
 * 
I got 9 of bb1:true and 242 bb1:false,
     251 of bb2:true and 0 bb2:false
     9 of ab:ab2true and 242 ab:ab2false
 *   if I change the bb1 & bb2, then the result reversed.
 */                
            }
        }

        @Override
        public void onTimeout(AsyncEvent event) throws IOException {
        }

        @Override
        public void onError(AsyncEvent event) throws IOException {
        }

        @Override
        public void onStartAsync(AsyncEvent event) throws IOException {
        }

        protected void complete() {
            StringWriter sw = new StringWriter();
            try {
                JsonGenerator g = Json.createGenerator(sw); //out
                g.writeStartObject().write("idReq", idReq);
                if (WebApp.writeReason(g, exweb)) { //no more information
                }
                g.writeEnd();
                g.flush();
            } catch (Throwable t) {
                try {
                    sw = new StringWriter();
                    JsonGenerator g = Json.createGenerator(sw); //out
                    g.writeStartObject().write("idReq", idReq);
                    if (WebApp.writeReason(g, exweb)) { //no more information
                    }
                    g.writeEnd();
                    g.flush();
                } catch (Throwable t2) {
                    return;
                }
            }
            try {
                WebApp.sendRes_json(sw.toString(), response);
                //os.close(); we do not open it.
                //PrintWriter out=response.getWriter();
                //out.print(json);out.flush();
            } catch (Throwable ex) {
                //just swallow it //Logger.getLogger(BA.class.getName()).log(Level.SEVERE, null, ex);
            }
            ac.complete();
        }

    }

    static class DataPiece {

        //ByteBuffer bb;
        int offset, len;
        byte[] ab, ab2;

        DataPiece(byte[] ab, int offset, int len) {
            this.ab = ab;
            this.offset = offset;
            this.len = len;
            ab2 = new byte[len];
            System.arraycopy(ab, 0, ab2, 0, len);
            //bb = ByteBuffer.wrap(ab2, 0, len);
        }
    }
}
Comment 1 Violeta Georgieva 2017-07-11 17:47:22 UTC
Hi,

What is the problem that you think there is?


Regards,
Violeta
Comment 2 Jack 2017-07-12 05:27:40 UTC
(In reply to Violeta Georgieva from comment #1)
> What is the problem that you think there is?
I am sorry, let me state it one more time, I hope I can make it clear this time.

servlet specification support nio, so when a user upload a file to server, at the server side, a servlet does not have to use the inputstream=request.getInputStream() to read the data. Instead, we set a readListener to the inputstream, when data is available, we got notified onDataAvailable(). This is good.

The problem is that inside onDataAvailable, we read data
byte[] ab=new byte[4096];
int len=is.read(ab,0,ab.length);

we have to use ab inside onDataAvailable method, once this method returns, the content of ab will be changed for no reason! This is very weird, and should not happen unless the specification indicates the data is only valid inside onDataAvailable method. 

so what I do to get around it is to copy the data.
byte[] ab2=new byte[len];
System.arraycopy(ab,0,ab2,0,len);

then the content of ab2 is reliable, even after long time, the content of ab2 is not changed.


Did I succeed in explaining this? Let me know if you have any doubt.
Comment 3 Violeta Georgieva 2017-07-12 07:42:03 UTC
Hi,

(In reply to Jack from comment #2)
> (In reply to Violeta Georgieva from comment #1)
> > What is the problem that you think there is?
> I am sorry, let me state it one more time, I hope I can make it clear this
> time.
> 
> servlet specification support nio, so when a user upload a file to server,
> at the server side, a servlet does not have to use the
> inputstream=request.getInputStream() to read the data. Instead, we set a
> readListener to the inputstream, when data is available, we got notified
> onDataAvailable(). This is good.
> 
> The problem is that inside onDataAvailable, we read data
> byte[] ab=new byte[4096];
> int len=is.read(ab,0,ab.length);
>

How many times the onDataAvailable is invoked in your scenario?

> 
> we have to use ab inside onDataAvailable method, once this method returns,
> the content of ab will be changed for no reason! This is very weird, and
> should not happen unless the specification indicates the data is only valid
> inside onDataAvailable method. 

The container does not have reference to the array that is provided with the read method. However as this is a non blocking read you may expect the following to happen:

- onDataAvailable is invoked by the container as there is data available for reading
- The code enters in while block, isReady returns true and the code reads the available data
- on the next iteration isReady returns false and the code exits onDataAvailable method
- when there is again data available for reading the container will invoke again onDataAvailable
- on this invocation the code will use the same byte array for reading, so the data in the byte array will be replaced with the new data

At some point the container will invoke onAllDataRead method in order to indicate that there is no more data for reading. At that point the byte array "ab" will contain the data from the last reading.

> 
> so what I do to get around it is to copy the data.
> byte[] ab2=new byte[len];
> System.arraycopy(ab,0,ab2,0,len);
> 
> then the content of ab2 is reliable, even after long time, the content of
> ab2 is not changed.
> 
> 
> Did I succeed in explaining this? Let me know if you have any doubt.

Regards,
Violeta
Comment 4 Jack 2017-07-12 08:05:20 UTC
I am very sorry, the byte array is allocated outside while, so it was my mistake! There is no bug here, please close this thread! 

Violeta, thank you for your time!