Extensive changes that _should_ fix the socket buffering problems,
[u/mdw/putty] / plink.c
diff --git a/plink.c b/plink.c
index 4113a81..4217116 100644 (file)
--- a/plink.c
+++ b/plink.c
@@ -15,6 +15,8 @@
 #include "storage.h"
 #include "tree234.h"
 
+#define MAX_STDIN_BACKLOG 4096
+
 void fatalbox(char *p, ...)
 {
     va_list ap;
@@ -125,20 +127,6 @@ DWORD orig_console_mode;
 
 WSAEVENT netevent;
 
-void from_backend(int is_stderr, char *data, int len)
-{
-    int pos;
-    DWORD ret;
-    HANDLE h = (is_stderr ? errhandle : outhandle);
-
-    pos = 0;
-    while (pos < len) {
-       if (!WriteFile(h, data + pos, len - pos, &ret, NULL))
-           return;                    /* give up in panic */
-       pos += ret;
-    }
-}
-
 int term_ldisc(int mode)
 {
     return FALSE;
@@ -160,12 +148,6 @@ void ldisc_update(int echo, int edit)
     SetConsoleMode(inhandle, mode);
 }
 
-struct input_data {
-    DWORD len;
-    char buffer[4096];
-    HANDLE event, eventback;
-};
-
 static int get_line(const char *prompt, char *str, int maxlen, int is_pw)
 {
     HANDLE hin, hout;
@@ -216,6 +198,12 @@ static int get_line(const char *prompt, char *str, int maxlen, int is_pw)
     return 1;
 }
 
+struct input_data {
+    DWORD len;
+    char buffer[4096];
+    HANDLE event, eventback;
+};
+
 static DWORD WINAPI stdin_read_thread(void *param)
 {
     struct input_data *idata = (struct input_data *) param;
@@ -235,6 +223,78 @@ static DWORD WINAPI stdin_read_thread(void *param)
     return 0;
 }
 
+struct output_data {
+    DWORD len, lenwritten;
+    int writeret;
+    char *buffer;
+    int is_stderr, done;
+    HANDLE event, eventback;
+    int busy;
+};
+
+static DWORD WINAPI stdout_write_thread(void *param)
+{
+    struct output_data *odata = (struct output_data *) param;
+    HANDLE outhandle, errhandle;
+
+    outhandle = GetStdHandle(STD_OUTPUT_HANDLE);
+    errhandle = GetStdHandle(STD_ERROR_HANDLE);
+
+    while (1) {
+       WaitForSingleObject(odata->eventback, INFINITE);
+       if (odata->done)
+           break;
+       odata->writeret =
+           WriteFile(odata->is_stderr ? errhandle : outhandle,
+                     odata->buffer, odata->len, &odata->lenwritten, NULL);
+       SetEvent(odata->event);
+    }
+
+    return 0;
+}
+
+bufchain stdout_data, stderr_data;
+struct output_data odata, edata;
+
+void try_output(int is_stderr)
+{
+    struct output_data *data = (is_stderr ? &edata : &odata);
+    void *senddata;
+    int sendlen;
+
+    if (!data->busy) {
+       bufchain_prefix(is_stderr ? &stderr_data : &stdout_data,
+                       &senddata, &sendlen);
+       data->buffer = senddata;
+       data->len = sendlen;
+       SetEvent(data->eventback);
+       data->busy = 1;
+    }
+}
+
+int from_backend(int is_stderr, char *data, int len)
+{
+    int pos;
+    DWORD ret;
+    HANDLE h = (is_stderr ? errhandle : outhandle);
+    void *writedata;
+    int writelen;
+    int osize, esize;
+
+    if (is_stderr) {
+       bufchain_add(&stderr_data, data, len);
+       try_output(1);
+    } else {
+       bufchain_add(&stdout_data, data, len);
+       try_output(0);
+    }
+
+    osize = bufchain_size(&stdout_data);
+    esize = bufchain_size(&stderr_data);
+
+    return osize + esize;
+}
+
 /*
  *  Short description of parameters.
  */
@@ -275,10 +335,11 @@ int main(int argc, char **argv)
 {
     WSADATA wsadata;
     WORD winsock_ver;
-    WSAEVENT stdinevent;
-    HANDLE handles[2];
-    DWORD threadid;
+    WSAEVENT stdinevent, stdoutevent, stderrevent;
+    HANDLE handles[4];
+    DWORD in_threadid, out_threadid, err_threadid;
     struct input_data idata;
+    int reading;
     int sending;
     int portnumber = -1;
     SOCKET *sklist;
@@ -554,6 +615,8 @@ int main(int argc, char **argv)
     connopen = 1;
 
     stdinevent = CreateEvent(NULL, FALSE, FALSE, NULL);
+    stdoutevent = CreateEvent(NULL, FALSE, FALSE, NULL);
+    stderrevent = CreateEvent(NULL, FALSE, FALSE, NULL);
 
     inhandle = GetStdHandle(STD_INPUT_HANDLE);
     outhandle = GetStdHandle(STD_OUTPUT_HANDLE);
@@ -568,7 +631,33 @@ int main(int argc, char **argv)
      */
     handles[0] = netevent;
     handles[1] = stdinevent;
+    handles[2] = stdoutevent;
+    handles[3] = stderrevent;
     sending = FALSE;
+
+    /*
+     * Create spare threads to write to stdout and stderr, so we
+     * can arrange asynchronous writes.
+     */
+    odata.event = stdoutevent;
+    odata.eventback = CreateEvent(NULL, FALSE, FALSE, NULL);
+    odata.is_stderr = 0;
+    odata.busy = odata.done = 0;
+    if (!CreateThread(NULL, 0, stdout_write_thread,
+                     &odata, 0, &out_threadid)) {
+       fprintf(stderr, "Unable to create output thread\n");
+       exit(1);
+    }
+    edata.event = stderrevent;
+    edata.eventback = CreateEvent(NULL, FALSE, FALSE, NULL);
+    edata.is_stderr = 1;
+    edata.busy = edata.done = 0;
+    if (!CreateThread(NULL, 0, stdout_write_thread,
+                     &edata, 0, &err_threadid)) {
+       fprintf(stderr, "Unable to create error output thread\n");
+       exit(1);
+    }
+
     while (1) {
        int n;
 
@@ -592,14 +681,14 @@ int main(int argc, char **argv)
            idata.event = stdinevent;
            idata.eventback = CreateEvent(NULL, FALSE, FALSE, NULL);
            if (!CreateThread(NULL, 0, stdin_read_thread,
-                             &idata, 0, &threadid)) {
-               fprintf(stderr, "Unable to create second thread\n");
+                             &idata, 0, &in_threadid)) {
+               fprintf(stderr, "Unable to create input thread\n");
                exit(1);
            }
            sending = TRUE;
        }
 
-       n = WaitForMultipleObjects(2, handles, FALSE, INFINITE);
+       n = WaitForMultipleObjects(4, handles, FALSE, INFINITE);
        if (n == 0) {
            WSANETWORKEVENTS things;
            SOCKET socket;
@@ -655,13 +744,39 @@ int main(int argc, char **argv)
                }
            }
        } else if (n == 1) {
+           reading = 0;
            noise_ultralight(idata.len);
            if (idata.len > 0) {
                back->send(idata.buffer, idata.len);
            } else {
                back->special(TS_EOF);
            }
+       } else if (n == 2) {
+           odata.busy = 0;
+           if (!odata.writeret) {
+               fprintf(stderr, "Unable to write to standard output\n");
+               exit(0);
+           }
+           bufchain_consume(&stdout_data, odata.lenwritten);
+           if (bufchain_size(&stdout_data) > 0)
+               try_output(0);
+           back->unthrottle(bufchain_size(&stdout_data) +
+                            bufchain_size(&stderr_data));
+       } else if (n == 3) {
+           edata.busy = 0;
+           if (!edata.writeret) {
+               fprintf(stderr, "Unable to write to standard output\n");
+               exit(0);
+           }
+           bufchain_consume(&stderr_data, edata.lenwritten);
+           if (bufchain_size(&stderr_data) > 0)
+               try_output(1);
+           back->unthrottle(bufchain_size(&stdout_data) +
+                            bufchain_size(&stderr_data));
+       }
+       if (!reading && back->sendbuffer() < MAX_STDIN_BACKLOG) {
            SetEvent(idata.eventback);
+           reading = 1;
        }
        if (!connopen || back->socket() == NULL)
            break;                     /* we closed the connection */