Extensive changes that _should_ fix the socket buffering problems,
authorsimon <simon@cda61777-01e9-0310-a592-d414129be87e>
Sat, 25 Aug 2001 17:09:23 +0000 (17:09 +0000)
committersimon <simon@cda61777-01e9-0310-a592-d414129be87e>
Sat, 25 Aug 2001 17:09:23 +0000 (17:09 +0000)
by ceasing to listen on input channels if the corresponding output
channel isn't accepting data. Has had basic check-I-didn't-actually-
break-anything-too-badly testing, but hasn't been genuinely tested
in stress conditions (because concocting stress conditions is non-
trivial).

git-svn-id: svn://svn.tartarus.org/sgt/putty@1198 cda61777-01e9-0310-a592-d414129be87e

17 files changed:
misc.c
misc.h
network.h
plink.c
portfwd.c
psftp.c
putty.h
raw.c
rlogin.c
scp.c
ssh.c
ssh.h
telnet.c
terminal.c
window.c
winnet.c
x11fwd.c

diff --git a/misc.c b/misc.c
index 2fabbb6..4af00d6 100644 (file)
--- a/misc.c
+++ b/misc.c
@@ -1,9 +1,105 @@
 #include <windows.h>
 #include <stdio.h>
 #include <stdlib.h>
+#include <assert.h>
 #include "putty.h"
 
 /*
+ * Generic routines to deal with send buffers: a linked list of
+ * smallish blocks, with the operations
+ * 
+ *  - add an arbitrary amount of data to the end of the list
+ *  - remove the first N bytes from the list
+ *  - return a (pointer,length) pair giving some initial data in
+ *    the list, suitable for passing to a send or write system
+ *    call
+ *  - return the current size of the buffer chain in bytes
+ */
+
+#define BUFFER_GRANULE  512
+
+struct bufchain_granule {
+    struct bufchain_granule *next;
+    int buflen, bufpos;
+    char buf[BUFFER_GRANULE];
+};
+
+void bufchain_init(bufchain *ch)
+{
+    ch->head = ch->tail = NULL;
+    ch->buffersize = 0;
+}
+
+void bufchain_clear(bufchain *ch)
+{
+    struct bufchain_granule *b;
+    while (ch->head) {
+       b = ch->head;
+       ch->head = ch->head->next;
+       sfree(b);
+    }
+    ch->tail = NULL;
+    ch->buffersize = 0;
+}
+
+int bufchain_size(bufchain *ch)
+{
+    return ch->buffersize;
+}
+
+void bufchain_add(bufchain *ch, void *data, int len)
+{
+    char *buf = (char *)data;
+
+    ch->buffersize += len;
+
+    if (ch->tail && ch->tail->buflen < BUFFER_GRANULE) {
+       int copylen = min(len, BUFFER_GRANULE - ch->tail->buflen);
+       memcpy(ch->tail->buf + ch->tail->buflen, buf, copylen);
+       buf += copylen;
+       len -= copylen;
+       ch->tail->buflen += copylen;
+    }
+    while (len > 0) {
+       int grainlen = min(len, BUFFER_GRANULE);
+       struct bufchain_granule *newbuf;
+       newbuf = smalloc(sizeof(struct bufchain_granule));
+       newbuf->bufpos = 0;
+       newbuf->buflen = grainlen;
+       memcpy(newbuf->buf, buf, grainlen);
+       buf += grainlen;
+       len -= grainlen;
+       if (ch->tail)
+           ch->tail->next = newbuf;
+       else
+           ch->head = ch->tail = newbuf;
+       newbuf->next = NULL;
+       ch->tail = newbuf;
+    }
+}
+
+void bufchain_consume(bufchain *ch, int len)
+{
+    assert(ch->buffersize >= len);
+    assert(ch->head != NULL && ch->head->bufpos + len <= ch->head->buflen);
+    ch->head->bufpos += len;
+    ch->buffersize -= len;
+    if (ch->head->bufpos >= ch->head->buflen) {
+       struct bufchain_granule *tmp = ch->head;
+       ch->head = tmp->next;
+       sfree(tmp);
+       if (!ch->head)
+           ch->tail = NULL;
+    }
+}
+
+void bufchain_prefix(bufchain *ch, void **data, int *len)
+{
+    *len = ch->head->buflen - ch->head->bufpos;
+    *data = ch->head->buf + ch->head->bufpos;
+}
+
+/*
  * My own versions of malloc, realloc and free. Because I want
  * malloc and realloc to bomb out and exit the program if they run
  * out of memory, realloc to reliably call malloc if passed a NULL
diff --git a/misc.h b/misc.h
index 754e9fe..d9b3811 100644 (file)
--- a/misc.h
+++ b/misc.h
@@ -3,6 +3,18 @@
 
 #include "puttymem.h"
 
+struct bufchain_granule;
+typedef struct bufchain_tag {
+    struct bufchain_granule *head, *tail;
+    int buffersize;                   /* current amount of buffered data */
+} bufchain;
+
+void bufchain_init(bufchain *ch);
+void bufchain_clear(bufchain *ch);
+int bufchain_size(bufchain *ch);
+void bufchain_add(bufchain *ch, void *data, int len);
+void bufchain_prefix(bufchain *ch, void **data, int *len);
+void bufchain_consume(bufchain *ch, int len);
 
 /*
  * Debugging functions.
index e205b87..0e1420a 100644 (file)
--- a/network.h
+++ b/network.h
@@ -24,8 +24,8 @@ struct socket_function_table {
     /* if p is NULL, it doesn't change the plug */
     /* but it does return the one it's using */
     void (*close) (Socket s);
-    void (*write) (Socket s, char *data, int len);
-    void (*write_oob) (Socket s, char *data, int len);
+    int (*write) (Socket s, char *data, int len);
+    int (*write_oob) (Socket s, char *data, int len);
     void (*flush) (Socket s);
     /* ignored by tcp, but vital for ssl */
     char *(*socket_error) (Socket s);
@@ -48,6 +48,12 @@ struct plug_function_table {
      *  - urgent==2. `data' points to `len' bytes of data,
      *    the first of which was the one at the Urgent mark.
      */
+    void (*sent) (Plug p, int bufsize);
+    /*
+     * The `sent' function is called when the pending send backlog
+     * on a socket is cleared or partially cleared. The new backlog
+     * size is passed in the `bufsize' parameter.
+     */
     int (*accepting)(Plug p, struct sockaddr *addr, void *sock);
     /*
      * returns 0 if the host at address addr is a valid host for connecting or error
@@ -76,6 +82,7 @@ Socket sk_register(void *sock, Plug plug);
 #ifdef DEFINE_PLUG_METHOD_MACROS
 #define plug_closing(p,msg,code,callback) (((*p)->closing) (p, msg, code, callback))
 #define plug_receive(p,urgent,buf,len) (((*p)->receive) (p, urgent, buf, len))
+#define plug_sent(p,bufsize) (((*p)->sent) (p, bufsize))
 #define plug_accepting(p, addr, sock) (((*p)->accepting)(p, addr, sock))
 #endif
 
@@ -99,10 +106,20 @@ char *sk_addr_error(SockAddr addr);
 
 /*
  * Set the `frozen' flag on a socket. A frozen socket is one in
- * which all sends are buffered and receives are ignored. This is
- * so that (for example) a new port-forwarding can sit in limbo
- * until its associated SSH channel is ready, and then pending data
- * can be sent on.
+ * which all READABLE notifications are ignored, so that data is
+ * not accepted from the peer until the socket is unfrozen. This
+ * exists for two purposes:
+ * 
+ *  - Port forwarding: when a local listening port receives a
+ *    connection, we do not want to receive data from the new
+ *    socket until we have somewhere to send it. Hence, we freeze
+ *    the socket until its associated SSH channel is ready; then we
+ *    unfreeze it and pending data is delivered.
+ * 
+ *  - Socket buffering: if an SSH channel (or the whole connection)
+ *    backs up or presents a zero window, we must freeze the
+ *    associated local socket in order to avoid unbounded buffer
+ *    growth.
  */
 void sk_set_frozen(Socket sock, int is_frozen);
 
diff --git a/plink.c b/plink.c
index 4113a81..4217116 100644 (file)
--- a/plink.c
+++ b/plink.c
@@ -15,6 +15,8 @@
 #include "storage.h"
 #include "tree234.h"
 
+#define MAX_STDIN_BACKLOG 4096
+
 void fatalbox(char *p, ...)
 {
     va_list ap;
@@ -125,20 +127,6 @@ DWORD orig_console_mode;
 
 WSAEVENT netevent;
 
-void from_backend(int is_stderr, char *data, int len)
-{
-    int pos;
-    DWORD ret;
-    HANDLE h = (is_stderr ? errhandle : outhandle);
-
-    pos = 0;
-    while (pos < len) {
-       if (!WriteFile(h, data + pos, len - pos, &ret, NULL))
-           return;                    /* give up in panic */
-       pos += ret;
-    }
-}
-
 int term_ldisc(int mode)
 {
     return FALSE;
@@ -160,12 +148,6 @@ void ldisc_update(int echo, int edit)
     SetConsoleMode(inhandle, mode);
 }
 
-struct input_data {
-    DWORD len;
-    char buffer[4096];
-    HANDLE event, eventback;
-};
-
 static int get_line(const char *prompt, char *str, int maxlen, int is_pw)
 {
     HANDLE hin, hout;
@@ -216,6 +198,12 @@ static int get_line(const char *prompt, char *str, int maxlen, int is_pw)
     return 1;
 }
 
+struct input_data {
+    DWORD len;
+    char buffer[4096];
+    HANDLE event, eventback;
+};
+
 static DWORD WINAPI stdin_read_thread(void *param)
 {
     struct input_data *idata = (struct input_data *) param;
@@ -235,6 +223,78 @@ static DWORD WINAPI stdin_read_thread(void *param)
     return 0;
 }
 
+struct output_data {
+    DWORD len, lenwritten;
+    int writeret;
+    char *buffer;
+    int is_stderr, done;
+    HANDLE event, eventback;
+    int busy;
+};
+
+static DWORD WINAPI stdout_write_thread(void *param)
+{
+    struct output_data *odata = (struct output_data *) param;
+    HANDLE outhandle, errhandle;
+
+    outhandle = GetStdHandle(STD_OUTPUT_HANDLE);
+    errhandle = GetStdHandle(STD_ERROR_HANDLE);
+
+    while (1) {
+       WaitForSingleObject(odata->eventback, INFINITE);
+       if (odata->done)
+           break;
+       odata->writeret =
+           WriteFile(odata->is_stderr ? errhandle : outhandle,
+                     odata->buffer, odata->len, &odata->lenwritten, NULL);
+       SetEvent(odata->event);
+    }
+
+    return 0;
+}
+
+bufchain stdout_data, stderr_data;
+struct output_data odata, edata;
+
+void try_output(int is_stderr)
+{
+    struct output_data *data = (is_stderr ? &edata : &odata);
+    void *senddata;
+    int sendlen;
+
+    if (!data->busy) {
+       bufchain_prefix(is_stderr ? &stderr_data : &stdout_data,
+                       &senddata, &sendlen);
+       data->buffer = senddata;
+       data->len = sendlen;
+       SetEvent(data->eventback);
+       data->busy = 1;
+    }
+}
+
+int from_backend(int is_stderr, char *data, int len)
+{
+    int pos;
+    DWORD ret;
+    HANDLE h = (is_stderr ? errhandle : outhandle);
+    void *writedata;
+    int writelen;
+    int osize, esize;
+
+    if (is_stderr) {
+       bufchain_add(&stderr_data, data, len);
+       try_output(1);
+    } else {
+       bufchain_add(&stdout_data, data, len);
+       try_output(0);
+    }
+
+    osize = bufchain_size(&stdout_data);
+    esize = bufchain_size(&stderr_data);
+
+    return osize + esize;
+}
+
 /*
  *  Short description of parameters.
  */
@@ -275,10 +335,11 @@ int main(int argc, char **argv)
 {
     WSADATA wsadata;
     WORD winsock_ver;
-    WSAEVENT stdinevent;
-    HANDLE handles[2];
-    DWORD threadid;
+    WSAEVENT stdinevent, stdoutevent, stderrevent;
+    HANDLE handles[4];
+    DWORD in_threadid, out_threadid, err_threadid;
     struct input_data idata;
+    int reading;
     int sending;
     int portnumber = -1;
     SOCKET *sklist;
@@ -554,6 +615,8 @@ int main(int argc, char **argv)
     connopen = 1;
 
     stdinevent = CreateEvent(NULL, FALSE, FALSE, NULL);
+    stdoutevent = CreateEvent(NULL, FALSE, FALSE, NULL);
+    stderrevent = CreateEvent(NULL, FALSE, FALSE, NULL);
 
     inhandle = GetStdHandle(STD_INPUT_HANDLE);
     outhandle = GetStdHandle(STD_OUTPUT_HANDLE);
@@ -568,7 +631,33 @@ int main(int argc, char **argv)
      */
     handles[0] = netevent;
     handles[1] = stdinevent;
+    handles[2] = stdoutevent;
+    handles[3] = stderrevent;
     sending = FALSE;
+
+    /*
+     * Create spare threads to write to stdout and stderr, so we
+     * can arrange asynchronous writes.
+     */
+    odata.event = stdoutevent;
+    odata.eventback = CreateEvent(NULL, FALSE, FALSE, NULL);
+    odata.is_stderr = 0;
+    odata.busy = odata.done = 0;
+    if (!CreateThread(NULL, 0, stdout_write_thread,
+                     &odata, 0, &out_threadid)) {
+       fprintf(stderr, "Unable to create output thread\n");
+       exit(1);
+    }
+    edata.event = stderrevent;
+    edata.eventback = CreateEvent(NULL, FALSE, FALSE, NULL);
+    edata.is_stderr = 1;
+    edata.busy = edata.done = 0;
+    if (!CreateThread(NULL, 0, stdout_write_thread,
+                     &edata, 0, &err_threadid)) {
+       fprintf(stderr, "Unable to create error output thread\n");
+       exit(1);
+    }
+
     while (1) {
        int n;
 
@@ -592,14 +681,14 @@ int main(int argc, char **argv)
            idata.event = stdinevent;
            idata.eventback = CreateEvent(NULL, FALSE, FALSE, NULL);
            if (!CreateThread(NULL, 0, stdin_read_thread,
-                             &idata, 0, &threadid)) {
-               fprintf(stderr, "Unable to create second thread\n");
+                             &idata, 0, &in_threadid)) {
+               fprintf(stderr, "Unable to create input thread\n");
                exit(1);
            }
            sending = TRUE;
        }
 
-       n = WaitForMultipleObjects(2, handles, FALSE, INFINITE);
+       n = WaitForMultipleObjects(4, handles, FALSE, INFINITE);
        if (n == 0) {
            WSANETWORKEVENTS things;
            SOCKET socket;
@@ -655,13 +744,39 @@ int main(int argc, char **argv)
                }
            }
        } else if (n == 1) {
+           reading = 0;
            noise_ultralight(idata.len);
            if (idata.len > 0) {
                back->send(idata.buffer, idata.len);
            } else {
                back->special(TS_EOF);
            }
+       } else if (n == 2) {
+           odata.busy = 0;
+           if (!odata.writeret) {
+               fprintf(stderr, "Unable to write to standard output\n");
+               exit(0);
+           }
+           bufchain_consume(&stdout_data, odata.lenwritten);
+           if (bufchain_size(&stdout_data) > 0)
+               try_output(0);
+           back->unthrottle(bufchain_size(&stdout_data) +
+                            bufchain_size(&stderr_data));
+       } else if (n == 3) {
+           edata.busy = 0;
+           if (!edata.writeret) {
+               fprintf(stderr, "Unable to write to standard output\n");
+               exit(0);
+           }
+           bufchain_consume(&stderr_data, edata.lenwritten);
+           if (bufchain_size(&stderr_data) > 0)
+               try_output(1);
+           back->unthrottle(bufchain_size(&stdout_data) +
+                            bufchain_size(&stderr_data));
+       }
+       if (!reading && back->sendbuffer() < MAX_STDIN_BACKLOG) {
            SetEvent(idata.eventback);
+           reading = 1;
        }
        if (!connopen || back->socket() == NULL)
            break;                     /* we closed the connection */
index 4d77ae1..4335398 100644 (file)
--- a/portfwd.c
+++ b/portfwd.c
@@ -52,9 +52,6 @@
   (cp)[0] = (value) >> 8, \
   (cp)[1] = (value) )
 
-extern void sshfwd_close(void *);
-extern void sshfwd_write(void *, char *, int);
-
 struct pfwd_queue {
     struct pfwd_queue *next;
     char *buf;
@@ -66,6 +63,7 @@ struct PFwdPrivate {
     void *c;                          /* (channel) data used by ssh.c */
     Socket s;
     char hostname[128];
+    int throttled, throttle_override;
     int port;
     int ready;
     struct pfwd_queue *waiting;
@@ -92,12 +90,22 @@ static int pfd_closing(Plug plug, char *error_msg, int error_code,
 static int pfd_receive(Plug plug, int urgent, char *data, int len)
 {
     struct PFwdPrivate *pr = (struct PFwdPrivate *) plug;
-
-    if (pr->ready)
-       sshfwd_write(pr->c, data, len);
+    if (pr->ready) {
+       if (sshfwd_write(pr->c, data, len) > 0) {
+           pr->throttled = 1;
+           sk_set_frozen(pr->s, 1);
+       }
+    }
     return 1;
 }
 
+static void pfd_sent(Plug plug, int bufsize)
+{
+    struct PFwdPrivate *pr = (struct PFwdPrivate *) plug;
+
+    sshfwd_unthrottle(pr->c, bufsize);
+}
+
 /*
  * Called when receiving a PORT OPEN from the server
  */
@@ -106,6 +114,7 @@ char *pfd_newconnect(Socket *s, char *hostname, int port, void *c)
     static struct plug_function_table fn_table = {
        pfd_closing,
        pfd_receive,
+       pfd_sent,
        NULL
     };
 
@@ -125,6 +134,7 @@ char *pfd_newconnect(Socket *s, char *hostname, int port, void *c)
      */
     pr = (struct PFwdPrivate *) smalloc(sizeof(struct PFwdPrivate));
     pr->fn = &fn_table;
+    pr->throttled = pr->throttle_override = 0;
     pr->ready = 1;
     pr->c = c;
 
@@ -149,6 +159,7 @@ static int pfd_accepting(Plug p, struct sockaddr *addr, void *sock)
     static struct plug_function_table fn_table = {
        pfd_closing,
        pfd_receive,
+       pfd_sent,
        NULL
     };
     struct PFwdPrivate *pr, *org;
@@ -175,6 +186,7 @@ static int pfd_accepting(Plug p, struct sockaddr *addr, void *sock)
 
     strcpy(pr->hostname, org->hostname);
     pr->port = org->port;
+    pr->throttled = pr->throttle_override = 0;
     pr->ready = 0;
     pr->waiting = NULL;
 
@@ -199,7 +211,8 @@ char *pfd_addforward(char *desthost, int destport, int port)
 {
     static struct plug_function_table fn_table = {
        pfd_closing,
-       pfd_receive, /* should not happen... */
+       pfd_receive,                   /* should not happen... */
+       pfd_sent,                      /* also should not happen */
        pfd_accepting
     };
 
@@ -215,6 +228,7 @@ char *pfd_addforward(char *desthost, int destport, int port)
     pr->c = NULL;
     strcpy(pr->hostname, desthost);
     pr->port = destport;
+    pr->throttled = pr->throttle_override = 0;
     pr->ready = 0;
     pr->waiting = NULL;
 
@@ -243,15 +257,36 @@ void pfd_close(Socket s)
     sk_close(s);
 }
 
+void pfd_unthrottle(Socket s)
+{
+    struct PFwdPrivate *pr;
+    if (!s)
+       return;
+    pr = (struct PFwdPrivate *) sk_get_private_ptr(s);
+
+    pr->throttled = 0;
+    sk_set_frozen(s, pr->throttled || pr->throttle_override);
+}
+
+void pfd_override_throttle(Socket s, int enable)
+{
+    struct PFwdPrivate *pr;
+    if (!s)
+       return;
+    pr = (struct PFwdPrivate *) sk_get_private_ptr(s);
+
+    pr->throttle_override = enable;
+    sk_set_frozen(s, pr->throttled || pr->throttle_override);
+}
+
 /*
  * Called to send data down the raw connection.
  */
-void pfd_send(Socket s, char *data, int len)
+int pfd_send(Socket s, char *data, int len)
 {
     if (s == NULL)
-       return;
-
-    sk_write(s, data, len);
+       return 0;
+    return sk_write(s, data, len);
 }
 
 
diff --git a/psftp.c b/psftp.c
index 0ab453c..7c10c08 100644 (file)
--- a/psftp.c
+++ b/psftp.c
 #include "sftp.h"
 #include "int64.h"
 
+/*
+ * Since SFTP is a request-response oriented protocol, it requires
+ * no buffer management: when we send data, we stop and wait for an
+ * acknowledgement _anyway_, and so we can't possibly overfill our
+ * send buffer.
+ */
+
 /* ----------------------------------------------------------------------
  * String handling routines.
  */
@@ -912,7 +919,7 @@ static unsigned char *outptr;              /* where to put the data */
 static unsigned outlen;                       /* how much data required */
 static unsigned char *pending = NULL;  /* any spare data */
 static unsigned pendlen = 0, pendsize = 0;     /* length and phys. size of buffer */
-void from_backend(int is_stderr, char *data, int datalen)
+int from_backend(int is_stderr, char *data, int datalen)
 {
     unsigned char *p = (unsigned char *) data;
     unsigned len = (unsigned) datalen;
@@ -923,14 +930,14 @@ void from_backend(int is_stderr, char *data, int datalen)
      */
     if (is_stderr) {
        fwrite(data, 1, len, stderr);
-       return;
+       return 0;
     }
 
     /*
      * If this is before the real session begins, just return.
      */
     if (!outptr)
-       return;
+       return 0;
 
     if (outlen > 0) {
        unsigned used = outlen;
@@ -954,6 +961,8 @@ void from_backend(int is_stderr, char *data, int datalen)
        memcpy(pending + pendlen, p, len);
        pendlen += len;
     }
+
+    return 0;
 }
 int sftp_recvdata(char *buf, int len)
 {
diff --git a/putty.h b/putty.h
index e305c2b..472c318 100644 (file)
--- a/putty.h
+++ b/putty.h
@@ -198,12 +198,20 @@ enum {
 
 typedef struct {
     char *(*init) (char *host, int port, char **realhost);
-    void (*send) (char *buf, int len);
+    /* back->send() returns the current amount of buffered data. */
+    int (*send) (char *buf, int len);
+    /* back->sendbuffer() does the same thing but without attempting a send */
+    int (*sendbuffer) (void);
     void (*size) (void);
     void (*special) (Telnet_Special code);
-     Socket(*socket) (void);
+    Socket(*socket) (void);
     int (*sendok) (void);
     int (*ldisc) (int);
+    /*
+     * back->unthrottle() tells the back end that the front end
+     * buffer is clearing.
+     */
+    void (*unthrottle) (int);
     int default_port;
 } Backend;
 
@@ -436,7 +444,7 @@ void term_do_paste(void);
 void term_paste(void);
 void term_nopaste(void);
 int term_ldisc(int option);
-void from_backend(int is_stderr, char *data, int len);
+int from_backend(int is_stderr, char *data, int len);
 void logfopen(void);
 void logfclose(void);
 void term_copyall(void);
diff --git a/raw.c b/raw.c
index 86fd531..52e4b06 100644 (file)
--- a/raw.c
+++ b/raw.c
 #define TRUE 1
 #endif
 
+#define RAW_MAX_BACKLOG 4096
+
 static Socket s = NULL;
+static int raw_bufsize;
 
 static void raw_size(void);
 
 static void c_write(char *buf, int len)
 {
-    from_backend(0, buf, len);
+    int backlog = from_backend(0, buf, len);
+    sk_set_frozen(s, backlog > RAW_MAX_BACKLOG);
 }
 
 static int raw_closing(Plug plug, char *error_msg, int error_code,
@@ -83,13 +87,23 @@ static char *raw_init(char *host, int port, char **realhost)
 /*
  * Called to send data down the raw connection.
  */
-static void raw_send(char *buf, int len)
+static int raw_send(char *buf, int len)
 {
 
     if (s == NULL)
        return;
 
-    sk_write(s, buf, len);
+    raw_bufsize = sk_write(s, buf, len);
+
+    return raw_bufsize;
+}
+
+/*
+ * Called to query the current socket sendability status.
+ */
+static int raw_sendbuffer(void)
+{
+    return raw_bufsize;
 }
 
 /*
@@ -120,6 +134,11 @@ static int raw_sendok(void)
     return 1;
 }
 
+static void raw_unthrottle(int backlog)
+{
+    sk_set_frozen(s, backlog > RAW_MAX_BACKLOG);
+}
+
 static int raw_ldisc(int option)
 {
     if (option == LD_EDIT || option == LD_ECHO)
@@ -130,10 +149,12 @@ static int raw_ldisc(int option)
 Backend raw_backend = {
     raw_init,
     raw_send,
+    raw_sendbuffer,
     raw_size,
     raw_special,
     raw_socket,
     raw_sendok,
     raw_ldisc,
+    raw_unthrottle,
     1
 };
index 8010852..5c55d5a 100644 (file)
--- a/rlogin.c
+++ b/rlogin.c
 #define TRUE 1
 #endif
 
+#define RLOGIN_MAX_BACKLOG 4096
+
 static Socket s = NULL;
+static int rlogin_bufsize;
 
 static void rlogin_size(void);
 
 static void c_write(char *buf, int len)
 {
-    from_backend(0, buf, len);
+    int backlog = from_backend(0, buf, len);
+    sk_set_frozen(s, backlog > RLOGIN_MAX_BACKLOG);
 }
 
 static int rlogin_closing(Plug plug, char *error_msg, int error_code,
@@ -122,7 +126,7 @@ static char *rlogin_init(char *host, int port, char **realhost)
        sk_write(s, "/", 1);
        for (p = cfg.termspeed; isdigit(*p); p++);
        sk_write(s, cfg.termspeed, p - cfg.termspeed);
-       sk_write(s, &z, 1);
+       rlogin_bufsize = sk_write(s, &z, 1);
     }
 
     return NULL;
@@ -131,13 +135,23 @@ static char *rlogin_init(char *host, int port, char **realhost)
 /*
  * Called to send data down the rlogin connection.
  */
-static void rlogin_send(char *buf, int len)
+static int rlogin_send(char *buf, int len)
 {
 
     if (s == NULL)
        return;
 
-    sk_write(s, buf, len);
+    rlogin_bufsize = sk_write(s, buf, len);
+
+    return rlogin_bufsize;
+}
+
+/*
+ * Called to query the current socket sendability status.
+ */
+static int rlogin_sendbuffer(void)
+{
+    return rlogin_bufsize;
 }
 
 /*
@@ -151,7 +165,7 @@ static void rlogin_size(void)
     b[7] = cols & 0xFF;
     b[4] = rows >> 8;
     b[5] = rows & 0xFF;
-    sk_write(s, b, 12);
+    rlogin_bufsize = sk_write(s, b, 12);
     return;
 }
 
@@ -174,6 +188,11 @@ static int rlogin_sendok(void)
     return 1;
 }
 
+static void rlogin_unthrottle(int backlog)
+{
+    sk_set_frozen(s, backlog > RLOGIN_MAX_BACKLOG);
+}
+
 static int rlogin_ldisc(int option)
 {
     return 0;
@@ -182,10 +201,12 @@ static int rlogin_ldisc(int option)
 Backend rlogin_backend = {
     rlogin_init,
     rlogin_send,
+    rlogin_sendbuffer,
     rlogin_size,
     rlogin_special,
     rlogin_socket,
     rlogin_sendok,
     rlogin_ldisc,
+    rlogin_unthrottle,
     1
 };
diff --git a/scp.c b/scp.c
index 3e6fcf2..b00500c 100644 (file)
--- a/scp.c
+++ b/scp.c
@@ -75,6 +75,12 @@ static void tell_user(FILE * stream, char *fmt, ...);
 static void gui_update_stats(char *name, unsigned long size,
                             int percentage, unsigned long elapsed);
 
+/*
+ * The maximum amount of queued data we accept before we stop and
+ * wait for the server to process some.
+ */
+#define MAX_SCP_BUFSIZE 16384
+
 void logevent(char *string)
 {
 }
@@ -309,7 +315,7 @@ static unsigned char *outptr;              /* where to put the data */
 static unsigned outlen;                       /* how much data required */
 static unsigned char *pending = NULL;  /* any spare data */
 static unsigned pendlen = 0, pendsize = 0;     /* length and phys. size of buffer */
-void from_backend(int is_stderr, char *data, int datalen)
+int from_backend(int is_stderr, char *data, int datalen)
 {
     unsigned char *p = (unsigned char *) data;
     unsigned len = (unsigned) datalen;
@@ -320,7 +326,7 @@ void from_backend(int is_stderr, char *data, int datalen)
      */
     if (is_stderr) {
        fwrite(data, 1, len, stderr);
-       return;
+       return 0;
     }
 
     inbuf_head = 0;
@@ -329,7 +335,7 @@ void from_backend(int is_stderr, char *data, int datalen)
      * If this is before the real session begins, just return.
      */
     if (!outptr)
-       return;
+       return 0;
 
     if (outlen > 0) {
        unsigned used = outlen;
@@ -353,6 +359,19 @@ void from_backend(int is_stderr, char *data, int datalen)
        memcpy(pending + pendlen, p, len);
        pendlen += len;
     }
+
+    return 0;
+}
+static int scp_process_network_event(void)
+{
+    fd_set readfds;
+
+    FD_ZERO(&readfds);
+    FD_SET(scp_ssh_socket, &readfds);
+    if (select(1, &readfds, NULL, NULL, NULL) < 0)
+       return 0;                      /* doom */
+    select_result((WPARAM) scp_ssh_socket, (LPARAM) FD_READ);
+    return 1;
 }
 static int ssh_scp_recv(unsigned char *buf, int len)
 {
@@ -382,13 +401,8 @@ static int ssh_scp_recv(unsigned char *buf, int len)
     }
 
     while (outlen > 0) {
-       fd_set readfds;
-
-       FD_ZERO(&readfds);
-       FD_SET(scp_ssh_socket, &readfds);
-       if (select(1, &readfds, NULL, NULL, NULL) < 0)
+       if (!scp_process_network_event())
            return 0;                  /* doom */
-       select_result((WPARAM) scp_ssh_socket, (LPARAM) FD_READ);
     }
 
     return len;
@@ -759,6 +773,8 @@ static void source(char *src)
     for (i = 0; i < size; i += 4096) {
        char transbuf[4096];
        DWORD j, k = 4096;
+       int bufsize;
+
        if (i + k > size)
            k = size - i;
        if (!ReadFile(f, transbuf, k, &j, NULL) || j != k) {
@@ -766,7 +782,7 @@ static void source(char *src)
                printf("\n");
            bump("%s: Read error", src);
        }
-       back->send(transbuf, k);
+       bufsize = back->send(transbuf, k);
        if (statistics) {
            stat_bytes += k;
            if (time(NULL) != stat_lasttime || i + k == size) {
@@ -775,6 +791,18 @@ static void source(char *src)
                            stat_starttime, stat_lasttime);
            }
        }
+
+       /*
+        * If the network transfer is backing up - that is, the
+        * remote site is not accepting data as fast as we can
+        * produce it - then we must loop on network events until
+        * we have space in the buffer again.
+        */
+       while (bufsize > MAX_SCP_BUFSIZE) {
+           if (!scp_process_network_event())
+               bump("%s: Network error occurred", src);
+           bufsize = back->sendbuffer();
+       }
     }
     CloseHandle(f);
 
diff --git a/ssh.c b/ssh.c
index a639019..be34659 100644 (file)
--- a/ssh.c
+++ b/ssh.c
@@ -195,14 +195,43 @@ enum { PKT_END, PKT_INT, PKT_CHAR, PKT_DATA, PKT_STR, PKT_BIGNUM };
 
 extern char *x11_init(Socket *, char *, void *);
 extern void x11_close(Socket);
-extern void x11_send(Socket, char *, int);
+extern int x11_send(Socket, char *, int);
 extern void x11_invent_auth(char *, int, char *, int);
+extern void x11_unthrottle(Socket s);
+extern void x11_override_throttle(Socket s, int enable);
 
 extern char *pfd_newconnect(Socket * s, char *hostname, int port, void *c);
 extern char *pfd_addforward(char *desthost, int destport, int port);
 extern void pfd_close(Socket s);
-extern void pfd_send(Socket s, char *data, int len);
+extern int pfd_send(Socket s, char *data, int len);
 extern void pfd_confirm(Socket s);
+extern void pfd_unthrottle(Socket s);
+extern void pfd_override_throttle(Socket s, int enable);
+
+/*
+ * Buffer management constants. There are several of these for
+ * various different purposes:
+ * 
+ *  - SSH1_BUFFER_LIMIT is the amount of backlog that must build up
+ *    on a local data stream before we throttle the whole SSH
+ *    connection (in SSH1 only). Throttling the whole connection is
+ *    pretty drastic so we set this high in the hope it won't
+ *    happen very often.
+ * 
+ *  - SSH_MAX_BACKLOG is the amount of backlog that must build up
+ *    on the SSH connection itself before we defensively throttle
+ *    _all_ local data streams. This is pretty drastic too (though
+ *    thankfully unlikely in SSH2 since the window mechanism should
+ *    ensure that the server never has any need to throttle its end
+ *    of the connection), so we set this high as well.
+ * 
+ *  - OUR_V2_WINSIZE is the maximum window size we present on SSH2
+ *    channels.
+ */
+
+#define SSH1_BUFFER_LIMIT 32768
+#define SSH_MAX_BACKLOG 32768
+#define OUR_V2_WINSIZE 16384
 
 /*
  * Ciphers for SSH2. We miss out single-DES because it isn't
@@ -282,11 +311,16 @@ struct ssh_channel {
     unsigned remoteid, localid;
     int type;
     int closes;
-    struct ssh2_data_channel {
-       unsigned char *outbuffer;
-       unsigned outbuflen, outbufsize;
-       unsigned remwindow, remmaxpkt;
-    } v2;
+    union {
+       struct ssh1_data_channel {
+           int throttling;
+       } v1;
+       struct ssh2_data_channel {
+           bufchain outbuffer;
+           unsigned remwindow, remmaxpkt;
+           unsigned locwindow;
+       } v2;
+    } v;
     union {
        struct ssh_agent_channel {
            unsigned char *message;
@@ -395,16 +429,22 @@ static unsigned char *deferred_send_data = NULL;
 static int deferred_len = 0, deferred_size = 0;
 
 static int ssh_version;
+static int ssh1_throttle_count;
+static int ssh_overall_bufsize;
+static int ssh_throttled_all;
+static int ssh1_stdout_throttling;
 static void (*ssh_protocol) (unsigned char *in, int inlen, int ispkt);
 static void ssh1_protocol(unsigned char *in, int inlen, int ispkt);
 static void ssh2_protocol(unsigned char *in, int inlen, int ispkt);
 static void ssh_size(void);
 static void ssh_special(Telnet_Special);
-static void ssh2_try_send(struct ssh_channel *c);
+static int ssh2_try_send(struct ssh_channel *c);
 static void ssh2_add_channel_data(struct ssh_channel *c, char *buf,
                                  int len);
-
+static void ssh_throttle_all(int enable, int bufsize);
+static void ssh2_set_window(struct ssh_channel *c, unsigned newwin);
 static int (*s_rdpkt) (unsigned char **data, int *datalen);
+static int ssh_sendbuffer(void);
 
 static struct rdpkt1_state_tag {
     long len, pad, biglen, to_read;
@@ -936,9 +976,11 @@ static int s_wrpkt_prepare(void)
 
 static void s_wrpkt(void)
 {
-    int len;
+    int len, backlog;
     len = s_wrpkt_prepare();
-    sk_write(s, pktout.data, len);
+    backlog = sk_write(s, pktout.data, len);
+    if (backlog > SSH_MAX_BACKLOG)
+       ssh_throttle_all(1, backlog);
 }
 
 static void s_wrpkt_defer(void)
@@ -1244,8 +1286,12 @@ static int ssh2_pkt_construct(void)
  */
 static void ssh2_pkt_send(void)
 {
-    int len = ssh2_pkt_construct();
-    sk_write(s, pktout.data, len);
+    int len;
+    int backlog;
+    len = ssh2_pkt_construct();
+    backlog = sk_write(s, pktout.data, len);
+    if (backlog > SSH_MAX_BACKLOG)
+       ssh_throttle_all(1, backlog);
 }
 
 /*
@@ -1276,10 +1322,13 @@ static void ssh2_pkt_defer(void)
  */
 static void ssh_pkt_defersend(void)
 {
-    sk_write(s, deferred_send_data, deferred_len);
+    int backlog;
+    backlog = sk_write(s, deferred_send_data, deferred_len);
     deferred_len = deferred_size = 0;
     sfree(deferred_send_data);
     deferred_send_data = NULL;
+    if (backlog > SSH_MAX_BACKLOG)
+       ssh_throttle_all(1, backlog);
 }
 
 #if 0
@@ -1579,6 +1628,16 @@ static int ssh_receive(Plug plug, int urgent, char *data, int len)
     return 1;
 }
 
+static void ssh_sent(Plug plug, int bufsize)
+{
+    /*
+     * If the send backlog on the SSH socket itself clears, we
+     * should unthrottle the whole world if it was throttled.
+     */
+    if (bufsize < SSH_MAX_BACKLOG)
+       ssh_throttle_all(0, bufsize);
+}
+
 /*
  * Connect to specified host and port.
  * Returns an error message, or NULL on success.
@@ -1589,7 +1648,9 @@ static char *connect_to_host(char *host, int port, char **realhost)
 {
     static struct plug_function_table fn_table = {
        ssh_closing,
-       ssh_receive
+       ssh_receive,
+       ssh_sent,
+       NULL
     }, *fn_table_ptr = &fn_table;
 
     SockAddr addr;
@@ -1647,6 +1708,56 @@ static char *connect_to_host(char *host, int port, char **realhost)
 }
 
 /*
+ * Throttle or unthrottle the SSH connection.
+ */
+static void ssh1_throttle(int adjust)
+{
+    int old_count = ssh1_throttle_count;
+    ssh1_throttle_count += adjust;
+    assert(ssh1_throttle_count >= 0);
+    if (ssh1_throttle_count && !old_count) {
+       sk_set_frozen(s, 1);
+    } else if (!ssh1_throttle_count && old_count) {
+       sk_set_frozen(s, 0);
+    }
+}
+
+/*
+ * Throttle or unthrottle _all_ local data streams (for when sends
+ * on the SSH connection itself back up).
+ */
+static void ssh_throttle_all(int enable, int bufsize)
+{
+    int i;
+    struct ssh_channel *c;
+
+    if (enable == ssh_throttled_all)
+       return;
+    ssh_throttled_all = enable;
+    ssh_overall_bufsize = bufsize;
+    if (!ssh_channels)
+       return;
+    for (i = 0; NULL != (c = index234(ssh_channels, i)); i++) {
+       switch (c->type) {
+         case CHAN_MAINSESSION:
+           /*
+            * This is treated separately, outside the switch.
+            */
+           break;
+         case CHAN_X11:
+           x11_override_throttle(c->u.x11.s, enable);
+           break;
+         case CHAN_AGENT:
+           /* Agent channels require no buffer management. */
+           break;
+         case CHAN_SOCKDATA:
+           pfd_override_throttle(c->u.x11.s, enable);
+           break;
+       }
+    }
+}
+
+/*
  * Handle the key exchange and user authentication phases.
  */
 static int do_ssh1_login(unsigned char *in, int inlen, int ispkt)
@@ -2346,15 +2457,35 @@ void sshfwd_close(struct ssh_channel *c)
     }
 }
 
-void sshfwd_write(struct ssh_channel *c, char *buf, int len)
+int sshfwd_write(struct ssh_channel *c, char *buf, int len)
 {
     if (ssh_version == 1) {
        send_packet(SSH1_MSG_CHANNEL_DATA,
                    PKT_INT, c->remoteid,
                    PKT_INT, len, PKT_DATA, buf, len, PKT_END);
+       /*
+        * In SSH1 we can return 0 here - implying that forwarded
+        * connections are never individually throttled - because
+        * the only circumstance that can cause throttling will be
+        * the whole SSH connection backing up, in which case
+        * _everything_ will be throttled as a whole.
+        */
+       return 0;
     } else {
        ssh2_add_channel_data(c, buf, len);
-       ssh2_try_send(c);
+       return ssh2_try_send(c);
+    }
+}
+
+void sshfwd_unthrottle(struct ssh_channel *c, int bufsize)
+{
+    if (ssh_version == 1) {
+       if (c->v.v1.throttling && bufsize < SSH1_BUFFER_LIMIT) {
+           c->v.v1.throttling = 0;
+           ssh1_throttle(-1);
+       }
+    } else {
+       ssh2_set_window(c, OUR_V2_WINSIZE - bufsize);
     }
 }
 
@@ -2540,8 +2671,13 @@ static void ssh1_protocol(unsigned char *in, int inlen, int ispkt)
            if (pktin.type == SSH1_SMSG_STDOUT_DATA ||
                pktin.type == SSH1_SMSG_STDERR_DATA) {
                long len = GET_32BIT(pktin.body);
-               from_backend(pktin.type == SSH1_SMSG_STDERR_DATA,
-                            pktin.body + 4, len);
+               int bufsize =
+                   from_backend(pktin.type == SSH1_SMSG_STDERR_DATA,
+                                pktin.body + 4, len);
+               if (bufsize > SSH1_BUFFER_LIMIT) {
+                   ssh1_stdout_throttling = 1;
+                   ssh1_throttle(+1);
+               }
            } else if (pktin.type == SSH1_MSG_DISCONNECT) {
                ssh_state = SSH_STATE_CLOSED;
                logevent("Received disconnect request");
@@ -2572,6 +2708,7 @@ static void ssh1_protocol(unsigned char *in, int inlen, int ispkt)
                        c->remoteid = GET_32BIT(pktin.body);
                        c->localid = alloc_channel_id();
                        c->closes = 0;
+                       c->v.v1.throttling = 0;
                        c->type = CHAN_X11;     /* identify channel type */
                        add234(ssh_channels, c);
                        send_packet(SSH1_MSG_CHANNEL_OPEN_CONFIRMATION,
@@ -2594,6 +2731,7 @@ static void ssh1_protocol(unsigned char *in, int inlen, int ispkt)
                    c->remoteid = GET_32BIT(pktin.body);
                    c->localid = alloc_channel_id();
                    c->closes = 0;
+                   c->v.v1.throttling = 0;
                    c->type = CHAN_AGENT;       /* identify channel type */
                    c->u.a.lensofar = 0;
                    add234(ssh_channels, c);
@@ -2646,6 +2784,7 @@ static void ssh1_protocol(unsigned char *in, int inlen, int ispkt)
                        c->remoteid = GET_32BIT(pktin.body);
                        c->localid = alloc_channel_id();
                        c->closes = 0;
+                       c->v.v1.throttling = 0;
                        c->type = CHAN_SOCKDATA;        /* identify channel type */
                        add234(ssh_channels, c);
                        send_packet(SSH1_MSG_CHANNEL_OPEN_CONFIRMATION,
@@ -2706,12 +2845,13 @@ static void ssh1_protocol(unsigned char *in, int inlen, int ispkt)
                struct ssh_channel *c;
                c = find234(ssh_channels, &i, ssh_channelfind);
                if (c) {
+                   int bufsize;
                    switch (c->type) {
                      case CHAN_X11:
-                       x11_send(c->u.x11.s, p, len);
+                       bufsize = x11_send(c->u.x11.s, p, len);
                        break;
                      case CHAN_SOCKDATA:
-                       pfd_send(c->u.pfd.s, p, len);
+                       bufsize = pfd_send(c->u.pfd.s, p, len);
                        break;
                      case CHAN_AGENT:
                        /* Data for an agent message. Buffer it. */
@@ -2764,8 +2904,13 @@ static void ssh1_protocol(unsigned char *in, int inlen, int ispkt)
                                c->u.a.lensofar = 0;
                            }
                        }
+                       bufsize = 0;   /* agent channels never back up */
                        break;
                    }
+                   if (bufsize > SSH1_BUFFER_LIMIT) {
+                       c->v.v1.throttling = 1;
+                       ssh1_throttle(+1);
+                   }
                }
            } else if (pktin.type == SSH1_SMSG_SUCCESS) {
                /* may be from EXEC_SHELL on some servers */
@@ -3272,33 +3417,49 @@ static int do_ssh2_transport(unsigned char *in, int inlen, int ispkt)
 static void ssh2_add_channel_data(struct ssh_channel *c, char *buf,
                                  int len)
 {
-    if (c->v2.outbufsize < c->v2.outbuflen + len) {
-       c->v2.outbufsize = c->v2.outbuflen + len + 1024;
-       c->v2.outbuffer = srealloc(c->v2.outbuffer, c->v2.outbufsize);
-    }
-    memcpy(c->v2.outbuffer + c->v2.outbuflen, buf, len);
-    c->v2.outbuflen += len;
+    bufchain_add(&c->v.v2.outbuffer, buf, len);
 }
 
 /*
  * Attempt to send data on an SSH2 channel.
  */
-static void ssh2_try_send(struct ssh_channel *c)
+static int ssh2_try_send(struct ssh_channel *c)
 {
-    while (c->v2.remwindow > 0 && c->v2.outbuflen > 0) {
-       unsigned len = c->v2.remwindow;
-       if (len > c->v2.outbuflen)
-           len = c->v2.outbuflen;
-       if (len > c->v2.remmaxpkt)
-           len = c->v2.remmaxpkt;
+    while (c->v.v2.remwindow > 0 && bufchain_size(&c->v.v2.outbuffer) > 0) {
+       int len;
+       void *data;
+       bufchain_prefix(&c->v.v2.outbuffer, &data, &len);
+       if ((unsigned)len > c->v.v2.remwindow)
+           len = c->v.v2.remwindow;
+       if ((unsigned)len > c->v.v2.remmaxpkt)
+           len = c->v.v2.remmaxpkt;
        ssh2_pkt_init(SSH2_MSG_CHANNEL_DATA);
        ssh2_pkt_adduint32(c->remoteid);
        ssh2_pkt_addstring_start();
-       ssh2_pkt_addstring_data(c->v2.outbuffer, len);
+       ssh2_pkt_addstring_data(data, len);
        ssh2_pkt_send();
-       c->v2.outbuflen -= len;
-       memmove(c->v2.outbuffer, c->v2.outbuffer + len, c->v2.outbuflen);
-       c->v2.remwindow -= len;
+       bufchain_consume(&c->v.v2.outbuffer, len);
+       c->v.v2.remwindow -= len;
+    }
+
+    /*
+     * After having sent as much data as we can, return the amount
+     * still buffered.
+     */
+    return bufchain_size(&c->v.v2.outbuffer);
+}
+
+/*
+ * Potentially enlarge the window on an SSH2 channel.
+ */
+static void ssh2_set_window(struct ssh_channel *c, unsigned newwin)
+{
+    if (newwin > c->v.v2.locwindow) {
+       ssh2_pkt_init(SSH2_MSG_CHANNEL_WINDOW_ADJUST);
+       ssh2_pkt_adduint32(c->remoteid);
+       ssh2_pkt_adduint32(newwin - c->v.v2.locwindow);
+       ssh2_pkt_send();
+       c->v.v2.locwindow = newwin;
     }
 }
 
@@ -4047,7 +4208,8 @@ static void do_ssh2_authconn(unsigned char *in, int inlen, int ispkt)
     ssh2_pkt_init(SSH2_MSG_CHANNEL_OPEN);
     ssh2_pkt_addstring("session");
     ssh2_pkt_adduint32(mainchan->localid);
-    ssh2_pkt_adduint32(0x8000UL);      /* our window size */
+    mainchan->v.v2.locwindow = OUR_V2_WINSIZE;
+    ssh2_pkt_adduint32(mainchan->v.v2.locwindow);      /* our window size */
     ssh2_pkt_adduint32(0x4000UL);      /* our max pkt size */
     ssh2_pkt_send();
     crWaitUntilV(ispkt);
@@ -4063,10 +4225,9 @@ static void do_ssh2_authconn(unsigned char *in, int inlen, int ispkt)
     mainchan->remoteid = ssh2_pkt_getuint32();
     mainchan->type = CHAN_MAINSESSION;
     mainchan->closes = 0;
-    mainchan->v2.remwindow = ssh2_pkt_getuint32();
-    mainchan->v2.remmaxpkt = ssh2_pkt_getuint32();
-    mainchan->v2.outbuffer = NULL;
-    mainchan->v2.outbuflen = mainchan->v2.outbufsize = 0;
+    mainchan->v.v2.remwindow = ssh2_pkt_getuint32();
+    mainchan->v.v2.remmaxpkt = ssh2_pkt_getuint32();
+    bufchain_init(&mainchan->v.v2.outbuffer);
     add234(ssh_channels, mainchan);
     logevent("Opened channel for session");
 
@@ -4095,7 +4256,7 @@ static void do_ssh2_authconn(unsigned char *in, int inlen, int ispkt)
                c = find234(ssh_channels, &i, ssh_channelfind);
                if (!c)
                    continue;          /* nonexistent channel */
-               c->v2.remwindow += ssh2_pkt_getuint32();
+               c->v.v2.remwindow += ssh2_pkt_getuint32();
            }
        } while (pktin.type == SSH2_MSG_CHANNEL_WINDOW_ADJUST);
 
@@ -4183,7 +4344,7 @@ static void do_ssh2_authconn(unsigned char *in, int inlen, int ispkt)
                                c = find234(ssh_channels, &i, ssh_channelfind);
                                if (!c)
                                    continue;/* nonexistent channel */
-                               c->v2.remwindow += ssh2_pkt_getuint32();
+                               c->v.v2.remwindow += ssh2_pkt_getuint32();
                            }
                        } while (pktin.type == SSH2_MSG_CHANNEL_WINDOW_ADJUST);
 
@@ -4221,7 +4382,7 @@ static void do_ssh2_authconn(unsigned char *in, int inlen, int ispkt)
                c = find234(ssh_channels, &i, ssh_channelfind);
                if (!c)
                    continue;          /* nonexistent channel */
-               c->v2.remwindow += ssh2_pkt_getuint32();
+               c->v.v2.remwindow += ssh2_pkt_getuint32();
            }
        } while (pktin.type == SSH2_MSG_CHANNEL_WINDOW_ADJUST);
 
@@ -4264,7 +4425,7 @@ static void do_ssh2_authconn(unsigned char *in, int inlen, int ispkt)
                c = find234(ssh_channels, &i, ssh_channelfind);
                if (!c)
                    continue;          /* nonexistent channel */
-               c->v2.remwindow += ssh2_pkt_getuint32();
+               c->v.v2.remwindow += ssh2_pkt_getuint32();
            }
        } while (pktin.type == SSH2_MSG_CHANNEL_WINDOW_ADJUST);
 
@@ -4308,7 +4469,7 @@ static void do_ssh2_authconn(unsigned char *in, int inlen, int ispkt)
            c = find234(ssh_channels, &i, ssh_channelfind);
            if (!c)
                continue;              /* nonexistent channel */
-           c->v2.remwindow += ssh2_pkt_getuint32();
+           c->v.v2.remwindow += ssh2_pkt_getuint32();
        }
     } while (pktin.type == SSH2_MSG_CHANNEL_WINDOW_ADJUST);
     if (pktin.type != SSH2_MSG_CHANNEL_SUCCESS) {
@@ -4352,17 +4513,20 @@ static void do_ssh2_authconn(unsigned char *in, int inlen, int ispkt)
                    continue;          /* extended but not stderr */
                ssh2_pkt_getstring(&data, &length);
                if (data) {
+                   int bufsize;
+                   c->v.v2.locwindow -= length;
                    switch (c->type) {
                      case CHAN_MAINSESSION:
-                       from_backend(pktin.type ==
-                                    SSH2_MSG_CHANNEL_EXTENDED_DATA, data,
-                                    length);
+                       bufsize =
+                           from_backend(pktin.type ==
+                                        SSH2_MSG_CHANNEL_EXTENDED_DATA,
+                                        data, length);
                        break;
                      case CHAN_X11:
-                       x11_send(c->u.x11.s, data, length);
+                       bufsize = x11_send(c->u.x11.s, data, length);
                        break;
                      case CHAN_SOCKDATA:
-                       pfd_send(c->u.pfd.s, data, length);
+                       bufsize = pfd_send(c->u.pfd.s, data, length);
                        break;
                      case CHAN_AGENT:
                        while (length > 0) {
@@ -4412,17 +4576,15 @@ static void do_ssh2_authconn(unsigned char *in, int inlen, int ispkt)
                                c->u.a.lensofar = 0;
                            }
                        }
+                       bufsize = 0;
                        break;
                    }
                    /*
-                    * Enlarge the window again at the remote
-                    * side, just in case it ever runs down and
-                    * they fail to send us any more data.
+                    * If we are not buffering too much data,
+                    * enlarge the window again at the remote side.
                     */
-                   ssh2_pkt_init(SSH2_MSG_CHANNEL_WINDOW_ADJUST);
-                   ssh2_pkt_adduint32(c->remoteid);
-                   ssh2_pkt_adduint32(length);
-                   ssh2_pkt_send();
+                   if (bufsize < OUR_V2_WINSIZE)
+                       ssh2_set_window(c, OUR_V2_WINSIZE - bufsize);
                }
            } else if (pktin.type == SSH2_MSG_DISCONNECT) {
                ssh_state = SSH_STATE_CLOSED;
@@ -4475,7 +4637,7 @@ static void do_ssh2_authconn(unsigned char *in, int inlen, int ispkt)
                    break;
                }
                del234(ssh_channels, c);
-               sfree(c->v2.outbuffer);
+               bufchain_clear(&c->v.v2.outbuffer);
                sfree(c);
 
                /*
@@ -4498,7 +4660,7 @@ static void do_ssh2_authconn(unsigned char *in, int inlen, int ispkt)
                c = find234(ssh_channels, &i, ssh_channelfind);
                if (!c)
                    continue;          /* nonexistent channel */
-               c->v2.remwindow += ssh2_pkt_getuint32();
+               c->v.v2.remwindow += ssh2_pkt_getuint32();
                try_send = TRUE;
            } else if (pktin.type == SSH2_MSG_CHANNEL_OPEN_CONFIRMATION) {
                unsigned i = ssh2_pkt_getuint32();
@@ -4511,10 +4673,9 @@ static void do_ssh2_authconn(unsigned char *in, int inlen, int ispkt)
                c->remoteid = ssh2_pkt_getuint32();
                c->type = CHAN_SOCKDATA;
                c->closes = 0;
-               c->v2.remwindow = ssh2_pkt_getuint32();
-               c->v2.remmaxpkt = ssh2_pkt_getuint32();
-               c->v2.outbuffer = NULL;
-               c->v2.outbuflen = c->v2.outbufsize = 0;
+               c->v.v2.remwindow = ssh2_pkt_getuint32();
+               c->v.v2.remmaxpkt = ssh2_pkt_getuint32();
+               bufchain_init(&c->v.v2.outbuffer);
                pfd_confirm(c->u.pfd.s);
            } else if (pktin.type == SSH2_MSG_CHANNEL_OPEN) {
                char *type;
@@ -4588,15 +4749,15 @@ static void do_ssh2_authconn(unsigned char *in, int inlen, int ispkt)
                } else {
                    c->localid = alloc_channel_id();
                    c->closes = 0;
-                   c->v2.remwindow = winsize;
-                   c->v2.remmaxpkt = pktsize;
-                   c->v2.outbuffer = NULL;
-                   c->v2.outbuflen = c->v2.outbufsize = 0;
+                   c->v.v2.locwindow = OUR_V2_WINSIZE;
+                   c->v.v2.remwindow = winsize;
+                   c->v.v2.remmaxpkt = pktsize;
+                   bufchain_init(&c->v.v2.outbuffer);
                    add234(ssh_channels, c);
                    ssh2_pkt_init(SSH2_MSG_CHANNEL_OPEN_CONFIRMATION);
                    ssh2_pkt_adduint32(c->remoteid);
                    ssh2_pkt_adduint32(c->localid);
-                   ssh2_pkt_adduint32(0x8000UL);       /* our window size */
+                   ssh2_pkt_adduint32(c->v.v2.locwindow);
                    ssh2_pkt_adduint32(0x4000UL);       /* our max pkt size */
                    ssh2_pkt_send();
                }
@@ -4617,8 +4778,27 @@ static void do_ssh2_authconn(unsigned char *in, int inlen, int ispkt)
            /*
             * Try to send data on all channels if we can.
             */
-           for (i = 0; NULL != (c = index234(ssh_channels, i)); i++)
-               ssh2_try_send(c);
+           for (i = 0; NULL != (c = index234(ssh_channels, i)); i++) {
+               int bufsize = ssh2_try_send(c);
+               if (bufsize == 0) {
+                   switch (c->type) {
+                     case CHAN_MAINSESSION:
+                       /* stdin need not receive an unthrottle
+                        * notification since it will be polled */
+                       break;
+                     case CHAN_X11:
+                       x11_unthrottle(c->u.x11.s);
+                       break;
+                     case CHAN_AGENT:
+                       /* agent sockets are request/response and need no
+                        * buffer management */
+                       break;
+                     case CHAN_SOCKDATA:
+                       pfd_unthrottle(c->u.pfd.s);
+                       break;
+                   }
+               }
+           }
        }
     }
 
@@ -4652,6 +4832,8 @@ static char *ssh_init(char *host, int port, char **realhost)
     ssh_send_ok = 0;
     ssh_editing = 0;
     ssh_echoing = 0;
+    ssh1_throttle_count = 0;
+    ssh_overall_bufsize = 0;
 
     p = connect_to_host(host, port, realhost);
     if (p != NULL)
@@ -4663,12 +4845,44 @@ static char *ssh_init(char *host, int port, char **realhost)
 /*
  * Called to send data down the Telnet connection.
  */
-static void ssh_send(char *buf, int len)
+static int ssh_send(char *buf, int len)
 {
     if (s == NULL || ssh_protocol == NULL)
-       return;
+       return 0;
 
     ssh_protocol(buf, len, 0);
+
+    return ssh_sendbuffer();
+}
+
+/*
+ * Called to query the current amount of buffered stdin data.
+ */
+static int ssh_sendbuffer(void)
+{
+    int override_value;
+
+    if (s == NULL || ssh_protocol == NULL)
+       return 0;
+
+    /*
+     * If the SSH socket itself has backed up, add the total backup
+     * size on that to any individual buffer on the stdin channel.
+     */
+    override_value = 0;
+    if (ssh_throttled_all)
+       override_value = ssh_overall_bufsize;
+
+    if (ssh_version == 1) {
+       return override_value;
+    } else if (ssh_version == 2) {
+       if (!mainchan || mainchan->closes > 0)
+           return override_value;
+       else
+           return override_value + bufchain_size(&mainchan->v.v2.outbuffer);
+    }
+
+    return 0;
 }
 
 /*
@@ -4762,6 +4976,23 @@ void *new_sock_channel(Socket s)
     return c;
 }
 
+/*
+ * This is called when stdout/stderr (the entity to which
+ * from_backend sends data) manages to clear some backlog.
+ */
+void ssh_unthrottle(int bufsize)
+{
+    if (ssh_version == 1) {
+       if (bufsize < SSH1_BUFFER_LIMIT) {
+           ssh1_stdout_throttling = 0;
+           ssh1_throttle(-1);
+       }
+    } else {
+       if (mainchan && mainchan->closes == 0)
+           ssh2_set_window(mainchan, OUR_V2_WINSIZE - bufsize);
+    }
+}
+
 void ssh_send_port_open(void *channel, char *hostname, int port, char *org)
 {
     struct ssh_channel *c = (struct ssh_channel *)channel;
@@ -4781,7 +5012,8 @@ void ssh_send_port_open(void *channel, char *hostname, int port, char *org)
        ssh2_pkt_init(SSH2_MSG_CHANNEL_OPEN);
        ssh2_pkt_addstring("direct-tcpip");
        ssh2_pkt_adduint32(c->localid);
-       ssh2_pkt_adduint32(0x8000UL);      /* our window size */
+       c->v.v2.locwindow = OUR_V2_WINSIZE;
+       ssh2_pkt_adduint32(c->v.v2.locwindow);/* our window size */
        ssh2_pkt_adduint32(0x4000UL);      /* our max pkt size */
        ssh2_pkt_addstring(hostname);
        ssh2_pkt_adduint32(port);
@@ -4820,10 +5052,12 @@ static int ssh_ldisc(int option)
 Backend ssh_backend = {
     ssh_init,
     ssh_send,
+    ssh_sendbuffer,
     ssh_size,
     ssh_special,
     ssh_socket,
     ssh_sendok,
     ssh_ldisc,
+    ssh_unthrottle,
     22
 };
diff --git a/ssh.h b/ssh.h
index d7b5b16..8097624 100644 (file)
--- a/ssh.h
+++ b/ssh.h
@@ -3,6 +3,10 @@
 #include "puttymem.h"
 #include "network.h"
 
+extern void sshfwd_close(struct ssh_channel *c);
+extern int sshfwd_write(struct ssh_channel *c, char *, int);
+extern void sshfwd_unthrottle(struct ssh_channel *c, int bufsize);
+
 /*
  * Useful thing.
  */
index 04cbec0..40d032c 100644 (file)
--- a/telnet.c
+++ b/telnet.c
@@ -174,9 +174,11 @@ static struct Opt *opts[] = {
     &o_we_sga, &o_they_sga, NULL
 };
 
+#define TELNET_MAX_BACKLOG 4096
+
 static int echoing = TRUE, editing = TRUE;
 static int activated = FALSE;
-
+static int telnet_bufsize;
 static int in_synch;
 static int sb_opt, sb_len;
 static char *sb_buf = NULL;
@@ -185,8 +187,10 @@ static int sb_size = 0;
 
 static void c_write1(int c)
 {
+    int backlog;
     char cc = (char) c;
-    from_backend(0, &cc, 1);
+    backlog = from_backend(0, &cc, 1);
+    sk_set_frozen(s, backlog > TELNET_MAX_BACKLOG);
 }
 
 static void log_option(char *sender, int cmd, int option)
@@ -206,7 +210,7 @@ static void send_opt(int cmd, int option)
     b[0] = IAC;
     b[1] = cmd;
     b[2] = option;
-    sk_write(s, b, 3);
+    telnet_bufsize = sk_write(s, b, 3);
     log_option("client", cmd, option);
 }
 
@@ -340,7 +344,7 @@ static void process_subneg(void)
            n = 4 + strlen(cfg.termspeed);
            b[n] = IAC;
            b[n + 1] = SE;
-           sk_write(s, b, n + 2);
+           telnet_bufsize = sk_write(s, b, n + 2);
            logevent("server:\tSB TSPEED SEND");
            sprintf(logbuf, "client:\tSB TSPEED IS %s", cfg.termspeed);
            logevent(logbuf);
@@ -361,7 +365,7 @@ static void process_subneg(void)
                            'a' : cfg.termtype[n]);
            b[n + 4] = IAC;
            b[n + 5] = SE;
-           sk_write(s, b, n + 6);
+           telnet_bufsize = sk_write(s, b, n + 6);
            b[n + 4] = 0;
            logevent("server:\tSB TTYPE SEND");
            sprintf(logbuf, "client:\tSB TTYPE IS %s", b + 4);
@@ -437,7 +441,7 @@ static void process_subneg(void)
            }
            b[n++] = IAC;
            b[n++] = SE;
-           sk_write(s, b, n);
+           telnet_bufsize = sk_write(s, b, n);
            sprintf(logbuf, "client:\tSB %s IS %s", telopt(sb_opt),
                    n == 6 ? "<nothing>" : "<stuff>");
            logevent(logbuf);
@@ -650,7 +654,7 @@ static char *telnet_init(char *host, int port, char **realhost)
 /*
  * Called to send data down the Telnet connection.
  */
-static void telnet_send(char *buf, int len)
+static int telnet_send(char *buf, int len)
 {
     char *p;
     static unsigned char iac[2] = { IAC, IAC };
@@ -660,7 +664,7 @@ static void telnet_send(char *buf, int len)
 #endif
 
     if (s == NULL)
-       return;
+       return 0;
 
     p = buf;
     while (p < buf + len) {
@@ -668,13 +672,24 @@ static void telnet_send(char *buf, int len)
 
        while (iswritable((unsigned char) *p) && p < buf + len)
            p++;
-       sk_write(s, q, p - q);
+       telnet_bufsize = sk_write(s, q, p - q);
 
        while (p < buf + len && !iswritable((unsigned char) *p)) {
-           sk_write(s, (unsigned char) *p == IAC ? iac : cr, 2);
+           telnet_bufsize = 
+               sk_write(s, (unsigned char) *p == IAC ? iac : cr, 2);
            p++;
        }
     }
+
+    return telnet_bufsize;
+}
+
+/*
+ * Called to query the current socket sendability status.
+ */
+static int telnet_sendbuffer(void)
+{
+    return telnet_bufsize;
 }
 
 /*
@@ -696,7 +711,7 @@ static void telnet_size(void)
     b[6] = rows & 0xFF;
     b[7] = IAC;
     b[8] = SE;
-    sk_write(s, b, 9);
+    telnet_bufsize = sk_write(s, b, 9);
     sprintf(logbuf, "client:\tSB NAWS %d,%d",
            ((unsigned char) b[3] << 8) + (unsigned char) b[4],
            ((unsigned char) b[5] << 8) + (unsigned char) b[6]);
@@ -717,59 +732,59 @@ static void telnet_special(Telnet_Special code)
     switch (code) {
       case TS_AYT:
        b[1] = AYT;
-       sk_write(s, b, 2);
+       telnet_bufsize = sk_write(s, b, 2);
        break;
       case TS_BRK:
        b[1] = BREAK;
-       sk_write(s, b, 2);
+       telnet_bufsize = sk_write(s, b, 2);
        break;
       case TS_EC:
        b[1] = EC;
-       sk_write(s, b, 2);
+       telnet_bufsize = sk_write(s, b, 2);
        break;
       case TS_EL:
        b[1] = EL;
-       sk_write(s, b, 2);
+       telnet_bufsize = sk_write(s, b, 2);
        break;
       case TS_GA:
        b[1] = GA;
-       sk_write(s, b, 2);
+       telnet_bufsize = sk_write(s, b, 2);
        break;
       case TS_NOP:
        b[1] = NOP;
-       sk_write(s, b, 2);
+       telnet_bufsize = sk_write(s, b, 2);
        break;
       case TS_ABORT:
        b[1] = ABORT;
-       sk_write(s, b, 2);
+       telnet_bufsize = sk_write(s, b, 2);
        break;
       case TS_AO:
        b[1] = AO;
-       sk_write(s, b, 2);
+       telnet_bufsize = sk_write(s, b, 2);
        break;
       case TS_IP:
        b[1] = IP;
-       sk_write(s, b, 2);
+       telnet_bufsize = sk_write(s, b, 2);
        break;
       case TS_SUSP:
        b[1] = SUSP;
-       sk_write(s, b, 2);
+       telnet_bufsize = sk_write(s, b, 2);
        break;
       case TS_EOR:
        b[1] = EOR;
-       sk_write(s, b, 2);
+       telnet_bufsize = sk_write(s, b, 2);
        break;
       case TS_EOF:
        b[1] = xEOF;
-       sk_write(s, b, 2);
+       telnet_bufsize = sk_write(s, b, 2);
        break;
       case TS_EOL:
-       sk_write(s, "\r\n", 2);
+       telnet_bufsize = sk_write(s, "\r\n", 2);
        break;
       case TS_SYNCH:
        b[1] = DM;
-       sk_write(s, b, 1);
-       sk_write_oob(s, b + 1, 1);
+       telnet_bufsize = sk_write(s, b, 1);
+       telnet_bufsize = sk_write_oob(s, b + 1, 1);
        break;
       case TS_RECHO:
        if (o_echo.state == INACTIVE || o_echo.state == REALLY_INACTIVE) {
@@ -786,7 +801,7 @@ static void telnet_special(Telnet_Special code)
       case TS_PING:
        if (o_they_sga.state == ACTIVE) {
            b[1] = NOP;
-           sk_write(s, b, 2);
+           telnet_bufsize = sk_write(s, b, 2);
        }
        break;
     }
@@ -802,6 +817,11 @@ static int telnet_sendok(void)
     return 1;
 }
 
+static void telnet_unthrottle(int backlog)
+{
+    sk_set_frozen(s, backlog > TELNET_MAX_BACKLOG);
+}
+
 static int telnet_ldisc(int option)
 {
     if (option == LD_ECHO)
@@ -814,10 +834,12 @@ static int telnet_ldisc(int option)
 Backend telnet_backend = {
     telnet_init,
     telnet_send,
+    telnet_sendbuffer,
     telnet_size,
     telnet_special,
     telnet_socket,
     telnet_sendok,
     telnet_ldisc,
+    telnet_unthrottle,
     23
 };
index f0dc9b1..87c151e 100644 (file)
@@ -3281,13 +3281,33 @@ int term_ldisc(int option)
 /*
  * from_backend(), to get data from the backend for the terminal.
  */
-void from_backend(int is_stderr, char *data, int len)
+int from_backend(int is_stderr, char *data, int len)
 {
     while (len--) {
        if (inbuf_head >= INBUF_SIZE)
            term_out();
        inbuf[inbuf_head++] = *data++;
     }
+
+    /*
+     * We process all stdout/stderr data immediately we receive it,
+     * and don't return until it's all gone. Therefore, there's no
+     * reason at all to return anything other than zero from this
+     * function.
+     * 
+     * This is a slightly suboptimal way to deal with SSH2 - in
+     * principle, the window mechanism would allow us to continue
+     * to accept data on forwarded ports and X connections even
+     * while the terminal processing was going slowly - but we
+     * can't do the 100% right thing without moving the terminal
+     * processing into a separate thread, and that might hurt
+     * portability. So we manage stdout buffering the old SSH1 way:
+     * if the terminal processing goes slowly, the whole SSH
+     * connection stops accepting data until it's ready.
+     * 
+     * In practice, I can't imagine this causing serious trouble.
+     */
+    return 0;
 }
 
 /*
index 15ad7b3..e2107ec 100644 (file)
--- a/window.c
+++ b/window.c
@@ -1863,6 +1863,14 @@ static LRESULT CALLBACK WndProc(HWND hwnd, UINT message,
                len = TranslateKey(message, wParam, lParam, buf);
                if (len == -1)
                    return DefWindowProc(hwnd, message, wParam, lParam);
+
+               /*
+                * We need not bother about stdin backlogs here,
+                * because in GUI PuTTY we can't do anything about
+                * it anyway; there's no means of asking Windows to
+                * hold off on KEYDOWN messages. We _have_ to
+                * buffer everything we're sent.
+                */
                ldisc_send(buf, len);
 
                if (len > 0)
@@ -2998,6 +3006,15 @@ static int TranslateKey(UINT message, WPARAM wParam, LPARAM lParam,
                            luni_send(&keybuf, 1);
                        } else {
                            ch = (char) alt_sum;
+                           /*
+                            * We need not bother about stdin
+                            * backlogs here, because in GUI PuTTY
+                            * we can't do anything about it
+                            * anyway; there's no means of asking
+                            * Windows to hold off on KEYDOWN
+                            * messages. We _have_ to buffer
+                            * everything we're sent.
+                            */
                            ldisc_send(&ch, 1);
                        }
                        alt_sum = 0;
index bcb9348..ee2fc3a 100644 (file)
--- a/winnet.c
+++ b/winnet.c
 #include <windows.h>
 #include <stdio.h>
 #include <stdlib.h>
+#include <assert.h>
 
 #define DEFINE_PLUG_METHOD_MACROS
 #include "putty.h"
 #include "network.h"
 #include "tree234.h"
 
-#define BUFFER_GRANULE  512
-
 struct Socket_tag {
     struct socket_function_table *fn;
     /* the above variable absolutely *must* be the first in this structure */
@@ -63,9 +62,12 @@ struct Socket_tag {
     SOCKET s;
     Plug plug;
     void *private_ptr;
-    struct buffer *head, *tail;
+    bufchain output_data;
     int writable;
-    int frozen; /* this tells the write stuff not to even bother trying to send at this point */
+    int frozen; /* this causes readability notifications to be ignored */
+    int frozen_readable; /* this means we missed at least one readability
+                         * notification while we were frozen */
+    char oobdata[1];
     int sending_oob;
     int oobinline;
 };
@@ -90,12 +92,6 @@ struct SockAddr_tag {
 #endif
 };
 
-struct buffer {
-    struct buffer *next;
-    int buflen, bufpos;
-    char buf[BUFFER_GRANULE];
-};
-
 static tree234 *sktree;
 
 static int cmpfortree(void *av, void *bv)
@@ -365,8 +361,8 @@ static void sk_tcp_flush(Socket s)
 }
 
 static void sk_tcp_close(Socket s);
-static void sk_tcp_write(Socket s, char *data, int len);
-static void sk_tcp_write_oob(Socket s, char *data, int len);
+static int sk_tcp_write(Socket s, char *data, int len);
+static int sk_tcp_write_oob(Socket s, char *data, int len);
 static char *sk_tcp_socket_error(Socket s);
 
 extern char *do_select(SOCKET skt, int startup);
@@ -393,10 +389,11 @@ Socket sk_register(void *sock, Plug plug)
     ret->fn = &fn_table;
     ret->error = NULL;
     ret->plug = plug;
-    ret->head = ret->tail = NULL;
+    bufchain_init(&ret->output_data);
     ret->writable = 1;                /* to start with */
     ret->sending_oob = 0;
     ret->frozen = 1;
+    ret->frozen_readable = 0;
 
     ret->s = (SOCKET)sock;
 
@@ -450,10 +447,11 @@ Socket sk_new(SockAddr addr, int port, int privport, int oobinline,
     ret->fn = &fn_table;
     ret->error = NULL;
     ret->plug = plug;
-    ret->head = ret->tail = NULL;
+    bufchain_init(&ret->output_data);
     ret->writable = 1;                /* to start with */
     ret->sending_oob = 0;
     ret->frozen = 0;
+    ret->frozen_readable = 0;
 
     /*
      * Open socket.
@@ -601,10 +599,11 @@ Socket sk_newlistenner(int port, Plug plug)
     ret->fn = &fn_table;
     ret->error = NULL;
     ret->plug = plug;
-    ret->head = ret->tail = NULL;
+    bufchain_init(&ret->output_data);
     ret->writable = 0;                /* to start with */
     ret->sending_oob = 0;
     ret->frozen = 0;
+    ret->frozen_readable = 0;
 
     /*
      * Open socket.
@@ -694,22 +693,22 @@ static void sk_tcp_close(Socket sock)
  */
 void try_send(Actual_Socket s)
 {
-    if (s->frozen) return;
-    while (s->head) {
+    while (s->sending_oob || bufchain_size(&s->output_data) > 0) {
        int nsent;
        DWORD err;
+       void *data;
        int len, urgentflag;
 
        if (s->sending_oob) {
            urgentflag = MSG_OOB;
            len = s->sending_oob;
+           data = &s->oobdata;
        } else {
            urgentflag = 0;
-           len = s->head->buflen - s->head->bufpos;
+           bufchain_prefix(&s->output_data, &data, &len);
        }
 
-       nsent =
-           send(s->s, s->head->buf + s->head->bufpos, len, urgentflag);
+       nsent = send(s->s, data, len, urgentflag);
        noise_ultralight(nsent);
        if (nsent <= 0) {
            err = (nsent < 0 ? WSAGetLastError() : 0);
@@ -742,83 +741,48 @@ void try_send(Actual_Socket s)
                fatalbox(winsock_error_string(err));
            }
        } else {
-           s->head->bufpos += nsent;
-           if (s->sending_oob)
-               s->sending_oob -= nsent;
-           if (s->head->bufpos >= s->head->buflen) {
-               struct buffer *tmp = s->head;
-               s->head = tmp->next;
-               sfree(tmp);
-               if (!s->head)
-                   s->tail = NULL;
+           if (s->sending_oob) {
+               if (nsent < len) {
+                   memmove(s->oobdata, s->oobdata+nsent, len-nsent);
+                   s->sending_oob = len - nsent;
+               } else {
+                   s->sending_oob = 0;
+               }
+           } else {
+               bufchain_consume(&s->output_data, nsent);
            }
        }
     }
 }
 
-static void sk_tcp_write(Socket sock, char *buf, int len)
+static int sk_tcp_write(Socket sock, char *buf, int len)
 {
     Actual_Socket s = (Actual_Socket) sock;
 
     /*
      * Add the data to the buffer list on the socket.
      */
-    if (s->tail && s->tail->buflen < BUFFER_GRANULE) {
-       int copylen = min(len, BUFFER_GRANULE - s->tail->buflen);
-       memcpy(s->tail->buf + s->tail->buflen, buf, copylen);
-       buf += copylen;
-       len -= copylen;
-       s->tail->buflen += copylen;
-    }
-    while (len > 0) {
-       int grainlen = min(len, BUFFER_GRANULE);
-       struct buffer *newbuf;
-       newbuf = smalloc(sizeof(struct buffer));
-       newbuf->bufpos = 0;
-       newbuf->buflen = grainlen;
-       memcpy(newbuf->buf, buf, grainlen);
-       buf += grainlen;
-       len -= grainlen;
-       if (s->tail)
-           s->tail->next = newbuf;
-       else
-           s->head = s->tail = newbuf;
-       newbuf->next = NULL;
-       s->tail = newbuf;
-    }
+    bufchain_add(&s->output_data, buf, len);
 
     /*
      * Now try sending from the start of the buffer list.
      */
     if (s->writable)
        try_send(s);
+
+    return bufchain_size(&s->output_data);
 }
 
-static void sk_tcp_write_oob(Socket sock, char *buf, int len)
+static int sk_tcp_write_oob(Socket sock, char *buf, int len)
 {
     Actual_Socket s = (Actual_Socket) sock;
 
     /*
      * Replace the buffer list on the socket with the data.
      */
-    if (!s->head) {
-       s->head = smalloc(sizeof(struct buffer));
-    } else {
-       struct buffer *walk = s->head->next;
-       while (walk) {
-           struct buffer *tmp = walk;
-           walk = tmp->next;
-           sfree(tmp);
-       }
-    }
-    s->head->next = NULL;
-    s->tail = s->head;
-    s->head->buflen = len;
-    memcpy(s->head->buf, buf, len);
-
-    /*
-     * Set the Urgent marker.
-     */
+    bufchain_clear(&s->output_data);
+    assert(len <= sizeof(s->oobdata));
+    memcpy(s->oobdata, buf, len);
     s->sending_oob = len;
 
     /*
@@ -826,6 +790,8 @@ static void sk_tcp_write_oob(Socket sock, char *buf, int len)
      */
     if (s->writable)
        try_send(s);
+
+    return s->sending_oob;
 }
 
 int select_result(WPARAM wParam, LPARAM lParam)
@@ -853,10 +819,11 @@ int select_result(WPARAM wParam, LPARAM lParam)
 
     switch (WSAGETSELECTEVENT(lParam)) {
       case FD_READ:
-
        /* In the case the socket is still frozen, we don't even bother */
-       if (s->frozen)
+       if (s->frozen) {
+           s->frozen_readable = 1;
            break;
+       }
 
        /*
         * We have received data on the socket. For an oobinline
@@ -911,8 +878,15 @@ int select_result(WPARAM wParam, LPARAM lParam)
        }
        break;
       case FD_WRITE:
-       s->writable = 1;
-       try_send(s);
+       {
+           int bufsize_before, bufsize_after;
+           s->writable = 1;
+           bufsize_before = s->sending_oob + bufchain_size(&s->output_data);
+           try_send(s);
+           bufsize_after = s->sending_oob + bufchain_size(&s->output_data);
+           if (bufsize_after < bufsize_before)
+               plug_sent(s->plug, bufsize_after);
+       }
        break;
       case FD_CLOSE:
        /* Signal a close on the socket. First read any outstanding data. */
@@ -992,11 +966,14 @@ static char *sk_tcp_socket_error(Socket sock)
 void sk_set_frozen(Socket sock, int is_frozen)
 {
     Actual_Socket s = (Actual_Socket) sock;
+    if (s->frozen == is_frozen)
+       return;
     s->frozen = is_frozen;
-    if (!is_frozen) {
+    if (!is_frozen && s->frozen_readable) {
        char c;
        recv(s->s, &c, 1, MSG_PEEK);
     }
+    s->frozen_readable = 0;
 }
 
 /*
index b13f695..0e0df20 100644 (file)
--- a/x11fwd.c
+++ b/x11fwd.c
@@ -58,9 +58,6 @@
 #define PUT_16BIT(endian, cp, val) \
   (endian=='B' ? PUT_16BIT_MSB_FIRST(cp, val) : PUT_16BIT_LSB_FIRST(cp, val))
 
-extern void sshfwd_close(void *);
-extern void sshfwd_write(void *, char *, int);
-
 struct X11Private {
     struct plug_function_table *fn;
     /* the above variable absolutely *must* be the first in this structure */
@@ -69,6 +66,7 @@ struct X11Private {
     unsigned char *auth_data;
     int data_read, auth_plen, auth_psize, auth_dlen, auth_dsize;
     int verified;
+    int throttled, throttle_override;
     void *c;                          /* data used by ssh.c */
     Socket s;
 };
@@ -127,10 +125,21 @@ static int x11_receive(Plug plug, int urgent, char *data, int len)
 {
     struct X11Private *pr = (struct X11Private *) plug;
 
-    sshfwd_write(pr->c, data, len);
+    if (sshfwd_write(pr->c, data, len) > 0) {
+       pr->throttled = 1;
+       sk_set_frozen(pr->s, 1);
+    }
+
     return 1;
 }
 
+static void x11_sent(Plug plug, int bufsize)
+{
+    struct X11Private *pr = (struct X11Private *) plug;
+
+    sshfwd_unthrottle(pr->c, bufsize);
+}
+
 /*
  * Called to set up the raw connection.
  * 
@@ -141,7 +150,9 @@ char *x11_init(Socket * s, char *display, void *c)
 {
     static struct plug_function_table fn_table = {
        x11_closing,
-       x11_receive
+       x11_receive,
+       x11_sent,
+       NULL
     };
 
     SockAddr addr;
@@ -181,6 +192,7 @@ char *x11_init(Socket * s, char *display, void *c)
     pr->auth_protocol = NULL;
     pr->verified = 0;
     pr->data_read = 0;
+    pr->throttled = pr->throttle_override = 0;
     pr->c = c;
 
     pr->s = *s = sk_new(addr, port, 0, 1, (Plug) pr);
@@ -210,15 +222,37 @@ void x11_close(Socket s)
     sk_close(s);
 }
 
+void x11_unthrottle(Socket s)
+{
+    struct X11Private *pr;
+    if (!s)
+       return;
+    pr = (struct X11Private *) sk_get_private_ptr(s);
+
+    pr->throttled = 0;
+    sk_set_frozen(s, pr->throttled || pr->throttle_override);
+}
+
+void x11_override_throttle(Socket s, int enable)
+{
+    struct X11Private *pr;
+    if (!s)
+       return;
+    pr = (struct X11Private *) sk_get_private_ptr(s);
+
+    pr->throttle_override = enable;
+    sk_set_frozen(s, pr->throttled || pr->throttle_override);
+}
+
 /*
  * Called to send data down the raw connection.
  */
-void x11_send(Socket s, char *data, int len)
+int x11_send(Socket s, char *data, int len)
 {
     struct X11Private *pr = (struct X11Private *) sk_get_private_ptr(s);
 
     if (s == NULL)
-       return;
+       return 0;
 
     /*
      * Read the first packet.
@@ -226,7 +260,7 @@ void x11_send(Socket s, char *data, int len)
     while (len > 0 && pr->data_read < 12)
        pr->firstpkt[pr->data_read++] = (unsigned char) (len--, *data++);
     if (pr->data_read < 12)
-       return;
+       return 0;
 
     /*
      * If we have not allocated the auth_protocol and auth_data
@@ -251,7 +285,7 @@ void x11_send(Socket s, char *data, int len)
        pr->auth_data[pr->data_read++ - 12 -
                      pr->auth_psize] = (unsigned char) (len--, *data++);
     if (pr->data_read < 12 + pr->auth_psize + pr->auth_dsize)
-       return;
+       return 0;
 
     /*
      * If we haven't verified the authentication, do so now.
@@ -280,7 +314,7 @@ void x11_send(Socket s, char *data, int len)
            sshfwd_write(pr->c, reply, 8 + msgsize);
            sshfwd_close(pr->c);
            x11_close(s);
-           return;
+           return 0;
        }
 
        /*
@@ -298,5 +332,5 @@ void x11_send(Socket s, char *data, int len)
      * After initialisation, just copy data simply.
      */
 
-    sk_write(s, data, len);
+    return sk_write(s, data, len);
 }