#include <windows.h>
#include <stdio.h>
#include <stdlib.h>
+#include <assert.h>
#include "putty.h"
/*
+ * Generic routines to deal with send buffers: a linked list of
+ * smallish blocks, with the operations
+ *
+ * - add an arbitrary amount of data to the end of the list
+ * - remove the first N bytes from the list
+ * - return a (pointer,length) pair giving some initial data in
+ * the list, suitable for passing to a send or write system
+ * call
+ * - return the current size of the buffer chain in bytes
+ */
+
+#define BUFFER_GRANULE 512
+
+struct bufchain_granule {
+ struct bufchain_granule *next;
+ int buflen, bufpos;
+ char buf[BUFFER_GRANULE];
+};
+
+void bufchain_init(bufchain *ch)
+{
+ ch->head = ch->tail = NULL;
+ ch->buffersize = 0;
+}
+
+void bufchain_clear(bufchain *ch)
+{
+ struct bufchain_granule *b;
+ while (ch->head) {
+ b = ch->head;
+ ch->head = ch->head->next;
+ sfree(b);
+ }
+ ch->tail = NULL;
+ ch->buffersize = 0;
+}
+
+int bufchain_size(bufchain *ch)
+{
+ return ch->buffersize;
+}
+
+void bufchain_add(bufchain *ch, void *data, int len)
+{
+ char *buf = (char *)data;
+
+ ch->buffersize += len;
+
+ if (ch->tail && ch->tail->buflen < BUFFER_GRANULE) {
+ int copylen = min(len, BUFFER_GRANULE - ch->tail->buflen);
+ memcpy(ch->tail->buf + ch->tail->buflen, buf, copylen);
+ buf += copylen;
+ len -= copylen;
+ ch->tail->buflen += copylen;
+ }
+ while (len > 0) {
+ int grainlen = min(len, BUFFER_GRANULE);
+ struct bufchain_granule *newbuf;
+ newbuf = smalloc(sizeof(struct bufchain_granule));
+ newbuf->bufpos = 0;
+ newbuf->buflen = grainlen;
+ memcpy(newbuf->buf, buf, grainlen);
+ buf += grainlen;
+ len -= grainlen;
+ if (ch->tail)
+ ch->tail->next = newbuf;
+ else
+ ch->head = ch->tail = newbuf;
+ newbuf->next = NULL;
+ ch->tail = newbuf;
+ }
+}
+
+void bufchain_consume(bufchain *ch, int len)
+{
+ assert(ch->buffersize >= len);
+ assert(ch->head != NULL && ch->head->bufpos + len <= ch->head->buflen);
+ ch->head->bufpos += len;
+ ch->buffersize -= len;
+ if (ch->head->bufpos >= ch->head->buflen) {
+ struct bufchain_granule *tmp = ch->head;
+ ch->head = tmp->next;
+ sfree(tmp);
+ if (!ch->head)
+ ch->tail = NULL;
+ }
+}
+
+void bufchain_prefix(bufchain *ch, void **data, int *len)
+{
+ *len = ch->head->buflen - ch->head->bufpos;
+ *data = ch->head->buf + ch->head->bufpos;
+}
+
+/*
* My own versions of malloc, realloc and free. Because I want
* malloc and realloc to bomb out and exit the program if they run
* out of memory, realloc to reliably call malloc if passed a NULL
#include "puttymem.h"
+struct bufchain_granule;
+typedef struct bufchain_tag {
+ struct bufchain_granule *head, *tail;
+ int buffersize; /* current amount of buffered data */
+} bufchain;
+
+void bufchain_init(bufchain *ch);
+void bufchain_clear(bufchain *ch);
+int bufchain_size(bufchain *ch);
+void bufchain_add(bufchain *ch, void *data, int len);
+void bufchain_prefix(bufchain *ch, void **data, int *len);
+void bufchain_consume(bufchain *ch, int len);
/*
* Debugging functions.
/* if p is NULL, it doesn't change the plug */
/* but it does return the one it's using */
void (*close) (Socket s);
- void (*write) (Socket s, char *data, int len);
- void (*write_oob) (Socket s, char *data, int len);
+ int (*write) (Socket s, char *data, int len);
+ int (*write_oob) (Socket s, char *data, int len);
void (*flush) (Socket s);
/* ignored by tcp, but vital for ssl */
char *(*socket_error) (Socket s);
* - urgent==2. `data' points to `len' bytes of data,
* the first of which was the one at the Urgent mark.
*/
+ void (*sent) (Plug p, int bufsize);
+ /*
+ * The `sent' function is called when the pending send backlog
+ * on a socket is cleared or partially cleared. The new backlog
+ * size is passed in the `bufsize' parameter.
+ */
int (*accepting)(Plug p, struct sockaddr *addr, void *sock);
/*
* returns 0 if the host at address addr is a valid host for connecting or error
#ifdef DEFINE_PLUG_METHOD_MACROS
#define plug_closing(p,msg,code,callback) (((*p)->closing) (p, msg, code, callback))
#define plug_receive(p,urgent,buf,len) (((*p)->receive) (p, urgent, buf, len))
+#define plug_sent(p,bufsize) (((*p)->sent) (p, bufsize))
#define plug_accepting(p, addr, sock) (((*p)->accepting)(p, addr, sock))
#endif
/*
* Set the `frozen' flag on a socket. A frozen socket is one in
- * which all sends are buffered and receives are ignored. This is
- * so that (for example) a new port-forwarding can sit in limbo
- * until its associated SSH channel is ready, and then pending data
- * can be sent on.
+ * which all READABLE notifications are ignored, so that data is
+ * not accepted from the peer until the socket is unfrozen. This
+ * exists for two purposes:
+ *
+ * - Port forwarding: when a local listening port receives a
+ * connection, we do not want to receive data from the new
+ * socket until we have somewhere to send it. Hence, we freeze
+ * the socket until its associated SSH channel is ready; then we
+ * unfreeze it and pending data is delivered.
+ *
+ * - Socket buffering: if an SSH channel (or the whole connection)
+ * backs up or presents a zero window, we must freeze the
+ * associated local socket in order to avoid unbounded buffer
+ * growth.
*/
void sk_set_frozen(Socket sock, int is_frozen);
#include "storage.h"
#include "tree234.h"
+#define MAX_STDIN_BACKLOG 4096
+
void fatalbox(char *p, ...)
{
va_list ap;
WSAEVENT netevent;
-void from_backend(int is_stderr, char *data, int len)
-{
- int pos;
- DWORD ret;
- HANDLE h = (is_stderr ? errhandle : outhandle);
-
- pos = 0;
- while (pos < len) {
- if (!WriteFile(h, data + pos, len - pos, &ret, NULL))
- return; /* give up in panic */
- pos += ret;
- }
-}
-
int term_ldisc(int mode)
{
return FALSE;
SetConsoleMode(inhandle, mode);
}
-struct input_data {
- DWORD len;
- char buffer[4096];
- HANDLE event, eventback;
-};
-
static int get_line(const char *prompt, char *str, int maxlen, int is_pw)
{
HANDLE hin, hout;
return 1;
}
+struct input_data {
+ DWORD len;
+ char buffer[4096];
+ HANDLE event, eventback;
+};
+
static DWORD WINAPI stdin_read_thread(void *param)
{
struct input_data *idata = (struct input_data *) param;
return 0;
}
+struct output_data {
+ DWORD len, lenwritten;
+ int writeret;
+ char *buffer;
+ int is_stderr, done;
+ HANDLE event, eventback;
+ int busy;
+};
+
+static DWORD WINAPI stdout_write_thread(void *param)
+{
+ struct output_data *odata = (struct output_data *) param;
+ HANDLE outhandle, errhandle;
+
+ outhandle = GetStdHandle(STD_OUTPUT_HANDLE);
+ errhandle = GetStdHandle(STD_ERROR_HANDLE);
+
+ while (1) {
+ WaitForSingleObject(odata->eventback, INFINITE);
+ if (odata->done)
+ break;
+ odata->writeret =
+ WriteFile(odata->is_stderr ? errhandle : outhandle,
+ odata->buffer, odata->len, &odata->lenwritten, NULL);
+ SetEvent(odata->event);
+ }
+
+ return 0;
+}
+
+bufchain stdout_data, stderr_data;
+struct output_data odata, edata;
+
+void try_output(int is_stderr)
+{
+ struct output_data *data = (is_stderr ? &edata : &odata);
+ void *senddata;
+ int sendlen;
+
+ if (!data->busy) {
+ bufchain_prefix(is_stderr ? &stderr_data : &stdout_data,
+ &senddata, &sendlen);
+ data->buffer = senddata;
+ data->len = sendlen;
+ SetEvent(data->eventback);
+ data->busy = 1;
+ }
+}
+
+int from_backend(int is_stderr, char *data, int len)
+{
+ int pos;
+ DWORD ret;
+ HANDLE h = (is_stderr ? errhandle : outhandle);
+ void *writedata;
+ int writelen;
+ int osize, esize;
+
+ if (is_stderr) {
+ bufchain_add(&stderr_data, data, len);
+ try_output(1);
+ } else {
+ bufchain_add(&stdout_data, data, len);
+ try_output(0);
+ }
+
+ osize = bufchain_size(&stdout_data);
+ esize = bufchain_size(&stderr_data);
+
+ return osize + esize;
+}
+
/*
* Short description of parameters.
*/
{
WSADATA wsadata;
WORD winsock_ver;
- WSAEVENT stdinevent;
- HANDLE handles[2];
- DWORD threadid;
+ WSAEVENT stdinevent, stdoutevent, stderrevent;
+ HANDLE handles[4];
+ DWORD in_threadid, out_threadid, err_threadid;
struct input_data idata;
+ int reading;
int sending;
int portnumber = -1;
SOCKET *sklist;
connopen = 1;
stdinevent = CreateEvent(NULL, FALSE, FALSE, NULL);
+ stdoutevent = CreateEvent(NULL, FALSE, FALSE, NULL);
+ stderrevent = CreateEvent(NULL, FALSE, FALSE, NULL);
inhandle = GetStdHandle(STD_INPUT_HANDLE);
outhandle = GetStdHandle(STD_OUTPUT_HANDLE);
*/
handles[0] = netevent;
handles[1] = stdinevent;
+ handles[2] = stdoutevent;
+ handles[3] = stderrevent;
sending = FALSE;
+
+ /*
+ * Create spare threads to write to stdout and stderr, so we
+ * can arrange asynchronous writes.
+ */
+ odata.event = stdoutevent;
+ odata.eventback = CreateEvent(NULL, FALSE, FALSE, NULL);
+ odata.is_stderr = 0;
+ odata.busy = odata.done = 0;
+ if (!CreateThread(NULL, 0, stdout_write_thread,
+ &odata, 0, &out_threadid)) {
+ fprintf(stderr, "Unable to create output thread\n");
+ exit(1);
+ }
+ edata.event = stderrevent;
+ edata.eventback = CreateEvent(NULL, FALSE, FALSE, NULL);
+ edata.is_stderr = 1;
+ edata.busy = edata.done = 0;
+ if (!CreateThread(NULL, 0, stdout_write_thread,
+ &edata, 0, &err_threadid)) {
+ fprintf(stderr, "Unable to create error output thread\n");
+ exit(1);
+ }
+
while (1) {
int n;
idata.event = stdinevent;
idata.eventback = CreateEvent(NULL, FALSE, FALSE, NULL);
if (!CreateThread(NULL, 0, stdin_read_thread,
- &idata, 0, &threadid)) {
- fprintf(stderr, "Unable to create second thread\n");
+ &idata, 0, &in_threadid)) {
+ fprintf(stderr, "Unable to create input thread\n");
exit(1);
}
sending = TRUE;
}
- n = WaitForMultipleObjects(2, handles, FALSE, INFINITE);
+ n = WaitForMultipleObjects(4, handles, FALSE, INFINITE);
if (n == 0) {
WSANETWORKEVENTS things;
SOCKET socket;
}
}
} else if (n == 1) {
+ reading = 0;
noise_ultralight(idata.len);
if (idata.len > 0) {
back->send(idata.buffer, idata.len);
} else {
back->special(TS_EOF);
}
+ } else if (n == 2) {
+ odata.busy = 0;
+ if (!odata.writeret) {
+ fprintf(stderr, "Unable to write to standard output\n");
+ exit(0);
+ }
+ bufchain_consume(&stdout_data, odata.lenwritten);
+ if (bufchain_size(&stdout_data) > 0)
+ try_output(0);
+ back->unthrottle(bufchain_size(&stdout_data) +
+ bufchain_size(&stderr_data));
+ } else if (n == 3) {
+ edata.busy = 0;
+ if (!edata.writeret) {
+ fprintf(stderr, "Unable to write to standard output\n");
+ exit(0);
+ }
+ bufchain_consume(&stderr_data, edata.lenwritten);
+ if (bufchain_size(&stderr_data) > 0)
+ try_output(1);
+ back->unthrottle(bufchain_size(&stdout_data) +
+ bufchain_size(&stderr_data));
+ }
+ if (!reading && back->sendbuffer() < MAX_STDIN_BACKLOG) {
SetEvent(idata.eventback);
+ reading = 1;
}
if (!connopen || back->socket() == NULL)
break; /* we closed the connection */
(cp)[0] = (value) >> 8, \
(cp)[1] = (value) )
-extern void sshfwd_close(void *);
-extern void sshfwd_write(void *, char *, int);
-
struct pfwd_queue {
struct pfwd_queue *next;
char *buf;
void *c; /* (channel) data used by ssh.c */
Socket s;
char hostname[128];
+ int throttled, throttle_override;
int port;
int ready;
struct pfwd_queue *waiting;
static int pfd_receive(Plug plug, int urgent, char *data, int len)
{
struct PFwdPrivate *pr = (struct PFwdPrivate *) plug;
-
- if (pr->ready)
- sshfwd_write(pr->c, data, len);
+ if (pr->ready) {
+ if (sshfwd_write(pr->c, data, len) > 0) {
+ pr->throttled = 1;
+ sk_set_frozen(pr->s, 1);
+ }
+ }
return 1;
}
+static void pfd_sent(Plug plug, int bufsize)
+{
+ struct PFwdPrivate *pr = (struct PFwdPrivate *) plug;
+
+ sshfwd_unthrottle(pr->c, bufsize);
+}
+
/*
* Called when receiving a PORT OPEN from the server
*/
static struct plug_function_table fn_table = {
pfd_closing,
pfd_receive,
+ pfd_sent,
NULL
};
*/
pr = (struct PFwdPrivate *) smalloc(sizeof(struct PFwdPrivate));
pr->fn = &fn_table;
+ pr->throttled = pr->throttle_override = 0;
pr->ready = 1;
pr->c = c;
static struct plug_function_table fn_table = {
pfd_closing,
pfd_receive,
+ pfd_sent,
NULL
};
struct PFwdPrivate *pr, *org;
strcpy(pr->hostname, org->hostname);
pr->port = org->port;
+ pr->throttled = pr->throttle_override = 0;
pr->ready = 0;
pr->waiting = NULL;
{
static struct plug_function_table fn_table = {
pfd_closing,
- pfd_receive, /* should not happen... */
+ pfd_receive, /* should not happen... */
+ pfd_sent, /* also should not happen */
pfd_accepting
};
pr->c = NULL;
strcpy(pr->hostname, desthost);
pr->port = destport;
+ pr->throttled = pr->throttle_override = 0;
pr->ready = 0;
pr->waiting = NULL;
sk_close(s);
}
+void pfd_unthrottle(Socket s)
+{
+ struct PFwdPrivate *pr;
+ if (!s)
+ return;
+ pr = (struct PFwdPrivate *) sk_get_private_ptr(s);
+
+ pr->throttled = 0;
+ sk_set_frozen(s, pr->throttled || pr->throttle_override);
+}
+
+void pfd_override_throttle(Socket s, int enable)
+{
+ struct PFwdPrivate *pr;
+ if (!s)
+ return;
+ pr = (struct PFwdPrivate *) sk_get_private_ptr(s);
+
+ pr->throttle_override = enable;
+ sk_set_frozen(s, pr->throttled || pr->throttle_override);
+}
+
/*
* Called to send data down the raw connection.
*/
-void pfd_send(Socket s, char *data, int len)
+int pfd_send(Socket s, char *data, int len)
{
if (s == NULL)
- return;
-
- sk_write(s, data, len);
+ return 0;
+ return sk_write(s, data, len);
}
#include "sftp.h"
#include "int64.h"
+/*
+ * Since SFTP is a request-response oriented protocol, it requires
+ * no buffer management: when we send data, we stop and wait for an
+ * acknowledgement _anyway_, and so we can't possibly overfill our
+ * send buffer.
+ */
+
/* ----------------------------------------------------------------------
* String handling routines.
*/
static unsigned outlen; /* how much data required */
static unsigned char *pending = NULL; /* any spare data */
static unsigned pendlen = 0, pendsize = 0; /* length and phys. size of buffer */
-void from_backend(int is_stderr, char *data, int datalen)
+int from_backend(int is_stderr, char *data, int datalen)
{
unsigned char *p = (unsigned char *) data;
unsigned len = (unsigned) datalen;
*/
if (is_stderr) {
fwrite(data, 1, len, stderr);
- return;
+ return 0;
}
/*
* If this is before the real session begins, just return.
*/
if (!outptr)
- return;
+ return 0;
if (outlen > 0) {
unsigned used = outlen;
memcpy(pending + pendlen, p, len);
pendlen += len;
}
+
+ return 0;
}
int sftp_recvdata(char *buf, int len)
{
typedef struct {
char *(*init) (char *host, int port, char **realhost);
- void (*send) (char *buf, int len);
+ /* back->send() returns the current amount of buffered data. */
+ int (*send) (char *buf, int len);
+ /* back->sendbuffer() does the same thing but without attempting a send */
+ int (*sendbuffer) (void);
void (*size) (void);
void (*special) (Telnet_Special code);
- Socket(*socket) (void);
+ Socket(*socket) (void);
int (*sendok) (void);
int (*ldisc) (int);
+ /*
+ * back->unthrottle() tells the back end that the front end
+ * buffer is clearing.
+ */
+ void (*unthrottle) (int);
int default_port;
} Backend;
void term_paste(void);
void term_nopaste(void);
int term_ldisc(int option);
-void from_backend(int is_stderr, char *data, int len);
+int from_backend(int is_stderr, char *data, int len);
void logfopen(void);
void logfclose(void);
void term_copyall(void);
#define TRUE 1
#endif
+#define RAW_MAX_BACKLOG 4096
+
static Socket s = NULL;
+static int raw_bufsize;
static void raw_size(void);
static void c_write(char *buf, int len)
{
- from_backend(0, buf, len);
+ int backlog = from_backend(0, buf, len);
+ sk_set_frozen(s, backlog > RAW_MAX_BACKLOG);
}
static int raw_closing(Plug plug, char *error_msg, int error_code,
/*
* Called to send data down the raw connection.
*/
-static void raw_send(char *buf, int len)
+static int raw_send(char *buf, int len)
{
if (s == NULL)
return;
- sk_write(s, buf, len);
+ raw_bufsize = sk_write(s, buf, len);
+
+ return raw_bufsize;
+}
+
+/*
+ * Called to query the current socket sendability status.
+ */
+static int raw_sendbuffer(void)
+{
+ return raw_bufsize;
}
/*
return 1;
}
+static void raw_unthrottle(int backlog)
+{
+ sk_set_frozen(s, backlog > RAW_MAX_BACKLOG);
+}
+
static int raw_ldisc(int option)
{
if (option == LD_EDIT || option == LD_ECHO)
Backend raw_backend = {
raw_init,
raw_send,
+ raw_sendbuffer,
raw_size,
raw_special,
raw_socket,
raw_sendok,
raw_ldisc,
+ raw_unthrottle,
1
};
#define TRUE 1
#endif
+#define RLOGIN_MAX_BACKLOG 4096
+
static Socket s = NULL;
+static int rlogin_bufsize;
static void rlogin_size(void);
static void c_write(char *buf, int len)
{
- from_backend(0, buf, len);
+ int backlog = from_backend(0, buf, len);
+ sk_set_frozen(s, backlog > RLOGIN_MAX_BACKLOG);
}
static int rlogin_closing(Plug plug, char *error_msg, int error_code,
sk_write(s, "/", 1);
for (p = cfg.termspeed; isdigit(*p); p++);
sk_write(s, cfg.termspeed, p - cfg.termspeed);
- sk_write(s, &z, 1);
+ rlogin_bufsize = sk_write(s, &z, 1);
}
return NULL;
/*
* Called to send data down the rlogin connection.
*/
-static void rlogin_send(char *buf, int len)
+static int rlogin_send(char *buf, int len)
{
if (s == NULL)
return;
- sk_write(s, buf, len);
+ rlogin_bufsize = sk_write(s, buf, len);
+
+ return rlogin_bufsize;
+}
+
+/*
+ * Called to query the current socket sendability status.
+ */
+static int rlogin_sendbuffer(void)
+{
+ return rlogin_bufsize;
}
/*
b[7] = cols & 0xFF;
b[4] = rows >> 8;
b[5] = rows & 0xFF;
- sk_write(s, b, 12);
+ rlogin_bufsize = sk_write(s, b, 12);
return;
}
return 1;
}
+static void rlogin_unthrottle(int backlog)
+{
+ sk_set_frozen(s, backlog > RLOGIN_MAX_BACKLOG);
+}
+
static int rlogin_ldisc(int option)
{
return 0;
Backend rlogin_backend = {
rlogin_init,
rlogin_send,
+ rlogin_sendbuffer,
rlogin_size,
rlogin_special,
rlogin_socket,
rlogin_sendok,
rlogin_ldisc,
+ rlogin_unthrottle,
1
};
static void gui_update_stats(char *name, unsigned long size,
int percentage, unsigned long elapsed);
+/*
+ * The maximum amount of queued data we accept before we stop and
+ * wait for the server to process some.
+ */
+#define MAX_SCP_BUFSIZE 16384
+
void logevent(char *string)
{
}
static unsigned outlen; /* how much data required */
static unsigned char *pending = NULL; /* any spare data */
static unsigned pendlen = 0, pendsize = 0; /* length and phys. size of buffer */
-void from_backend(int is_stderr, char *data, int datalen)
+int from_backend(int is_stderr, char *data, int datalen)
{
unsigned char *p = (unsigned char *) data;
unsigned len = (unsigned) datalen;
*/
if (is_stderr) {
fwrite(data, 1, len, stderr);
- return;
+ return 0;
}
inbuf_head = 0;
* If this is before the real session begins, just return.
*/
if (!outptr)
- return;
+ return 0;
if (outlen > 0) {
unsigned used = outlen;
memcpy(pending + pendlen, p, len);
pendlen += len;
}
+
+ return 0;
+}
+static int scp_process_network_event(void)
+{
+ fd_set readfds;
+
+ FD_ZERO(&readfds);
+ FD_SET(scp_ssh_socket, &readfds);
+ if (select(1, &readfds, NULL, NULL, NULL) < 0)
+ return 0; /* doom */
+ select_result((WPARAM) scp_ssh_socket, (LPARAM) FD_READ);
+ return 1;
}
static int ssh_scp_recv(unsigned char *buf, int len)
{
}
while (outlen > 0) {
- fd_set readfds;
-
- FD_ZERO(&readfds);
- FD_SET(scp_ssh_socket, &readfds);
- if (select(1, &readfds, NULL, NULL, NULL) < 0)
+ if (!scp_process_network_event())
return 0; /* doom */
- select_result((WPARAM) scp_ssh_socket, (LPARAM) FD_READ);
}
return len;
for (i = 0; i < size; i += 4096) {
char transbuf[4096];
DWORD j, k = 4096;
+ int bufsize;
+
if (i + k > size)
k = size - i;
if (!ReadFile(f, transbuf, k, &j, NULL) || j != k) {
printf("\n");
bump("%s: Read error", src);
}
- back->send(transbuf, k);
+ bufsize = back->send(transbuf, k);
if (statistics) {
stat_bytes += k;
if (time(NULL) != stat_lasttime || i + k == size) {
stat_starttime, stat_lasttime);
}
}
+
+ /*
+ * If the network transfer is backing up - that is, the
+ * remote site is not accepting data as fast as we can
+ * produce it - then we must loop on network events until
+ * we have space in the buffer again.
+ */
+ while (bufsize > MAX_SCP_BUFSIZE) {
+ if (!scp_process_network_event())
+ bump("%s: Network error occurred", src);
+ bufsize = back->sendbuffer();
+ }
}
CloseHandle(f);
extern char *x11_init(Socket *, char *, void *);
extern void x11_close(Socket);
-extern void x11_send(Socket, char *, int);
+extern int x11_send(Socket, char *, int);
extern void x11_invent_auth(char *, int, char *, int);
+extern void x11_unthrottle(Socket s);
+extern void x11_override_throttle(Socket s, int enable);
extern char *pfd_newconnect(Socket * s, char *hostname, int port, void *c);
extern char *pfd_addforward(char *desthost, int destport, int port);
extern void pfd_close(Socket s);
-extern void pfd_send(Socket s, char *data, int len);
+extern int pfd_send(Socket s, char *data, int len);
extern void pfd_confirm(Socket s);
+extern void pfd_unthrottle(Socket s);
+extern void pfd_override_throttle(Socket s, int enable);
+
+/*
+ * Buffer management constants. There are several of these for
+ * various different purposes:
+ *
+ * - SSH1_BUFFER_LIMIT is the amount of backlog that must build up
+ * on a local data stream before we throttle the whole SSH
+ * connection (in SSH1 only). Throttling the whole connection is
+ * pretty drastic so we set this high in the hope it won't
+ * happen very often.
+ *
+ * - SSH_MAX_BACKLOG is the amount of backlog that must build up
+ * on the SSH connection itself before we defensively throttle
+ * _all_ local data streams. This is pretty drastic too (though
+ * thankfully unlikely in SSH2 since the window mechanism should
+ * ensure that the server never has any need to throttle its end
+ * of the connection), so we set this high as well.
+ *
+ * - OUR_V2_WINSIZE is the maximum window size we present on SSH2
+ * channels.
+ */
+
+#define SSH1_BUFFER_LIMIT 32768
+#define SSH_MAX_BACKLOG 32768
+#define OUR_V2_WINSIZE 16384
/*
* Ciphers for SSH2. We miss out single-DES because it isn't
unsigned remoteid, localid;
int type;
int closes;
- struct ssh2_data_channel {
- unsigned char *outbuffer;
- unsigned outbuflen, outbufsize;
- unsigned remwindow, remmaxpkt;
- } v2;
+ union {
+ struct ssh1_data_channel {
+ int throttling;
+ } v1;
+ struct ssh2_data_channel {
+ bufchain outbuffer;
+ unsigned remwindow, remmaxpkt;
+ unsigned locwindow;
+ } v2;
+ } v;
union {
struct ssh_agent_channel {
unsigned char *message;
static int deferred_len = 0, deferred_size = 0;
static int ssh_version;
+static int ssh1_throttle_count;
+static int ssh_overall_bufsize;
+static int ssh_throttled_all;
+static int ssh1_stdout_throttling;
static void (*ssh_protocol) (unsigned char *in, int inlen, int ispkt);
static void ssh1_protocol(unsigned char *in, int inlen, int ispkt);
static void ssh2_protocol(unsigned char *in, int inlen, int ispkt);
static void ssh_size(void);
static void ssh_special(Telnet_Special);
-static void ssh2_try_send(struct ssh_channel *c);
+static int ssh2_try_send(struct ssh_channel *c);
static void ssh2_add_channel_data(struct ssh_channel *c, char *buf,
int len);
-
+static void ssh_throttle_all(int enable, int bufsize);
+static void ssh2_set_window(struct ssh_channel *c, unsigned newwin);
static int (*s_rdpkt) (unsigned char **data, int *datalen);
+static int ssh_sendbuffer(void);
static struct rdpkt1_state_tag {
long len, pad, biglen, to_read;
static void s_wrpkt(void)
{
- int len;
+ int len, backlog;
len = s_wrpkt_prepare();
- sk_write(s, pktout.data, len);
+ backlog = sk_write(s, pktout.data, len);
+ if (backlog > SSH_MAX_BACKLOG)
+ ssh_throttle_all(1, backlog);
}
static void s_wrpkt_defer(void)
*/
static void ssh2_pkt_send(void)
{
- int len = ssh2_pkt_construct();
- sk_write(s, pktout.data, len);
+ int len;
+ int backlog;
+ len = ssh2_pkt_construct();
+ backlog = sk_write(s, pktout.data, len);
+ if (backlog > SSH_MAX_BACKLOG)
+ ssh_throttle_all(1, backlog);
}
/*
*/
static void ssh_pkt_defersend(void)
{
- sk_write(s, deferred_send_data, deferred_len);
+ int backlog;
+ backlog = sk_write(s, deferred_send_data, deferred_len);
deferred_len = deferred_size = 0;
sfree(deferred_send_data);
deferred_send_data = NULL;
+ if (backlog > SSH_MAX_BACKLOG)
+ ssh_throttle_all(1, backlog);
}
#if 0
return 1;
}
+static void ssh_sent(Plug plug, int bufsize)
+{
+ /*
+ * If the send backlog on the SSH socket itself clears, we
+ * should unthrottle the whole world if it was throttled.
+ */
+ if (bufsize < SSH_MAX_BACKLOG)
+ ssh_throttle_all(0, bufsize);
+}
+
/*
* Connect to specified host and port.
* Returns an error message, or NULL on success.
{
static struct plug_function_table fn_table = {
ssh_closing,
- ssh_receive
+ ssh_receive,
+ ssh_sent,
+ NULL
}, *fn_table_ptr = &fn_table;
SockAddr addr;
}
/*
+ * Throttle or unthrottle the SSH connection.
+ */
+static void ssh1_throttle(int adjust)
+{
+ int old_count = ssh1_throttle_count;
+ ssh1_throttle_count += adjust;
+ assert(ssh1_throttle_count >= 0);
+ if (ssh1_throttle_count && !old_count) {
+ sk_set_frozen(s, 1);
+ } else if (!ssh1_throttle_count && old_count) {
+ sk_set_frozen(s, 0);
+ }
+}
+
+/*
+ * Throttle or unthrottle _all_ local data streams (for when sends
+ * on the SSH connection itself back up).
+ */
+static void ssh_throttle_all(int enable, int bufsize)
+{
+ int i;
+ struct ssh_channel *c;
+
+ if (enable == ssh_throttled_all)
+ return;
+ ssh_throttled_all = enable;
+ ssh_overall_bufsize = bufsize;
+ if (!ssh_channels)
+ return;
+ for (i = 0; NULL != (c = index234(ssh_channels, i)); i++) {
+ switch (c->type) {
+ case CHAN_MAINSESSION:
+ /*
+ * This is treated separately, outside the switch.
+ */
+ break;
+ case CHAN_X11:
+ x11_override_throttle(c->u.x11.s, enable);
+ break;
+ case CHAN_AGENT:
+ /* Agent channels require no buffer management. */
+ break;
+ case CHAN_SOCKDATA:
+ pfd_override_throttle(c->u.x11.s, enable);
+ break;
+ }
+ }
+}
+
+/*
* Handle the key exchange and user authentication phases.
*/
static int do_ssh1_login(unsigned char *in, int inlen, int ispkt)
}
}
-void sshfwd_write(struct ssh_channel *c, char *buf, int len)
+int sshfwd_write(struct ssh_channel *c, char *buf, int len)
{
if (ssh_version == 1) {
send_packet(SSH1_MSG_CHANNEL_DATA,
PKT_INT, c->remoteid,
PKT_INT, len, PKT_DATA, buf, len, PKT_END);
+ /*
+ * In SSH1 we can return 0 here - implying that forwarded
+ * connections are never individually throttled - because
+ * the only circumstance that can cause throttling will be
+ * the whole SSH connection backing up, in which case
+ * _everything_ will be throttled as a whole.
+ */
+ return 0;
} else {
ssh2_add_channel_data(c, buf, len);
- ssh2_try_send(c);
+ return ssh2_try_send(c);
+ }
+}
+
+void sshfwd_unthrottle(struct ssh_channel *c, int bufsize)
+{
+ if (ssh_version == 1) {
+ if (c->v.v1.throttling && bufsize < SSH1_BUFFER_LIMIT) {
+ c->v.v1.throttling = 0;
+ ssh1_throttle(-1);
+ }
+ } else {
+ ssh2_set_window(c, OUR_V2_WINSIZE - bufsize);
}
}
if (pktin.type == SSH1_SMSG_STDOUT_DATA ||
pktin.type == SSH1_SMSG_STDERR_DATA) {
long len = GET_32BIT(pktin.body);
- from_backend(pktin.type == SSH1_SMSG_STDERR_DATA,
- pktin.body + 4, len);
+ int bufsize =
+ from_backend(pktin.type == SSH1_SMSG_STDERR_DATA,
+ pktin.body + 4, len);
+ if (bufsize > SSH1_BUFFER_LIMIT) {
+ ssh1_stdout_throttling = 1;
+ ssh1_throttle(+1);
+ }
} else if (pktin.type == SSH1_MSG_DISCONNECT) {
ssh_state = SSH_STATE_CLOSED;
logevent("Received disconnect request");
c->remoteid = GET_32BIT(pktin.body);
c->localid = alloc_channel_id();
c->closes = 0;
+ c->v.v1.throttling = 0;
c->type = CHAN_X11; /* identify channel type */
add234(ssh_channels, c);
send_packet(SSH1_MSG_CHANNEL_OPEN_CONFIRMATION,
c->remoteid = GET_32BIT(pktin.body);
c->localid = alloc_channel_id();
c->closes = 0;
+ c->v.v1.throttling = 0;
c->type = CHAN_AGENT; /* identify channel type */
c->u.a.lensofar = 0;
add234(ssh_channels, c);
c->remoteid = GET_32BIT(pktin.body);
c->localid = alloc_channel_id();
c->closes = 0;
+ c->v.v1.throttling = 0;
c->type = CHAN_SOCKDATA; /* identify channel type */
add234(ssh_channels, c);
send_packet(SSH1_MSG_CHANNEL_OPEN_CONFIRMATION,
struct ssh_channel *c;
c = find234(ssh_channels, &i, ssh_channelfind);
if (c) {
+ int bufsize;
switch (c->type) {
case CHAN_X11:
- x11_send(c->u.x11.s, p, len);
+ bufsize = x11_send(c->u.x11.s, p, len);
break;
case CHAN_SOCKDATA:
- pfd_send(c->u.pfd.s, p, len);
+ bufsize = pfd_send(c->u.pfd.s, p, len);
break;
case CHAN_AGENT:
/* Data for an agent message. Buffer it. */
c->u.a.lensofar = 0;
}
}
+ bufsize = 0; /* agent channels never back up */
break;
}
+ if (bufsize > SSH1_BUFFER_LIMIT) {
+ c->v.v1.throttling = 1;
+ ssh1_throttle(+1);
+ }
}
} else if (pktin.type == SSH1_SMSG_SUCCESS) {
/* may be from EXEC_SHELL on some servers */
static void ssh2_add_channel_data(struct ssh_channel *c, char *buf,
int len)
{
- if (c->v2.outbufsize < c->v2.outbuflen + len) {
- c->v2.outbufsize = c->v2.outbuflen + len + 1024;
- c->v2.outbuffer = srealloc(c->v2.outbuffer, c->v2.outbufsize);
- }
- memcpy(c->v2.outbuffer + c->v2.outbuflen, buf, len);
- c->v2.outbuflen += len;
+ bufchain_add(&c->v.v2.outbuffer, buf, len);
}
/*
* Attempt to send data on an SSH2 channel.
*/
-static void ssh2_try_send(struct ssh_channel *c)
+static int ssh2_try_send(struct ssh_channel *c)
{
- while (c->v2.remwindow > 0 && c->v2.outbuflen > 0) {
- unsigned len = c->v2.remwindow;
- if (len > c->v2.outbuflen)
- len = c->v2.outbuflen;
- if (len > c->v2.remmaxpkt)
- len = c->v2.remmaxpkt;
+ while (c->v.v2.remwindow > 0 && bufchain_size(&c->v.v2.outbuffer) > 0) {
+ int len;
+ void *data;
+ bufchain_prefix(&c->v.v2.outbuffer, &data, &len);
+ if ((unsigned)len > c->v.v2.remwindow)
+ len = c->v.v2.remwindow;
+ if ((unsigned)len > c->v.v2.remmaxpkt)
+ len = c->v.v2.remmaxpkt;
ssh2_pkt_init(SSH2_MSG_CHANNEL_DATA);
ssh2_pkt_adduint32(c->remoteid);
ssh2_pkt_addstring_start();
- ssh2_pkt_addstring_data(c->v2.outbuffer, len);
+ ssh2_pkt_addstring_data(data, len);
ssh2_pkt_send();
- c->v2.outbuflen -= len;
- memmove(c->v2.outbuffer, c->v2.outbuffer + len, c->v2.outbuflen);
- c->v2.remwindow -= len;
+ bufchain_consume(&c->v.v2.outbuffer, len);
+ c->v.v2.remwindow -= len;
+ }
+
+ /*
+ * After having sent as much data as we can, return the amount
+ * still buffered.
+ */
+ return bufchain_size(&c->v.v2.outbuffer);
+}
+
+/*
+ * Potentially enlarge the window on an SSH2 channel.
+ */
+static void ssh2_set_window(struct ssh_channel *c, unsigned newwin)
+{
+ if (newwin > c->v.v2.locwindow) {
+ ssh2_pkt_init(SSH2_MSG_CHANNEL_WINDOW_ADJUST);
+ ssh2_pkt_adduint32(c->remoteid);
+ ssh2_pkt_adduint32(newwin - c->v.v2.locwindow);
+ ssh2_pkt_send();
+ c->v.v2.locwindow = newwin;
}
}
ssh2_pkt_init(SSH2_MSG_CHANNEL_OPEN);
ssh2_pkt_addstring("session");
ssh2_pkt_adduint32(mainchan->localid);
- ssh2_pkt_adduint32(0x8000UL); /* our window size */
+ mainchan->v.v2.locwindow = OUR_V2_WINSIZE;
+ ssh2_pkt_adduint32(mainchan->v.v2.locwindow); /* our window size */
ssh2_pkt_adduint32(0x4000UL); /* our max pkt size */
ssh2_pkt_send();
crWaitUntilV(ispkt);
mainchan->remoteid = ssh2_pkt_getuint32();
mainchan->type = CHAN_MAINSESSION;
mainchan->closes = 0;
- mainchan->v2.remwindow = ssh2_pkt_getuint32();
- mainchan->v2.remmaxpkt = ssh2_pkt_getuint32();
- mainchan->v2.outbuffer = NULL;
- mainchan->v2.outbuflen = mainchan->v2.outbufsize = 0;
+ mainchan->v.v2.remwindow = ssh2_pkt_getuint32();
+ mainchan->v.v2.remmaxpkt = ssh2_pkt_getuint32();
+ bufchain_init(&mainchan->v.v2.outbuffer);
add234(ssh_channels, mainchan);
logevent("Opened channel for session");
c = find234(ssh_channels, &i, ssh_channelfind);
if (!c)
continue; /* nonexistent channel */
- c->v2.remwindow += ssh2_pkt_getuint32();
+ c->v.v2.remwindow += ssh2_pkt_getuint32();
}
} while (pktin.type == SSH2_MSG_CHANNEL_WINDOW_ADJUST);
c = find234(ssh_channels, &i, ssh_channelfind);
if (!c)
continue;/* nonexistent channel */
- c->v2.remwindow += ssh2_pkt_getuint32();
+ c->v.v2.remwindow += ssh2_pkt_getuint32();
}
} while (pktin.type == SSH2_MSG_CHANNEL_WINDOW_ADJUST);
c = find234(ssh_channels, &i, ssh_channelfind);
if (!c)
continue; /* nonexistent channel */
- c->v2.remwindow += ssh2_pkt_getuint32();
+ c->v.v2.remwindow += ssh2_pkt_getuint32();
}
} while (pktin.type == SSH2_MSG_CHANNEL_WINDOW_ADJUST);
c = find234(ssh_channels, &i, ssh_channelfind);
if (!c)
continue; /* nonexistent channel */
- c->v2.remwindow += ssh2_pkt_getuint32();
+ c->v.v2.remwindow += ssh2_pkt_getuint32();
}
} while (pktin.type == SSH2_MSG_CHANNEL_WINDOW_ADJUST);
c = find234(ssh_channels, &i, ssh_channelfind);
if (!c)
continue; /* nonexistent channel */
- c->v2.remwindow += ssh2_pkt_getuint32();
+ c->v.v2.remwindow += ssh2_pkt_getuint32();
}
} while (pktin.type == SSH2_MSG_CHANNEL_WINDOW_ADJUST);
if (pktin.type != SSH2_MSG_CHANNEL_SUCCESS) {
continue; /* extended but not stderr */
ssh2_pkt_getstring(&data, &length);
if (data) {
+ int bufsize;
+ c->v.v2.locwindow -= length;
switch (c->type) {
case CHAN_MAINSESSION:
- from_backend(pktin.type ==
- SSH2_MSG_CHANNEL_EXTENDED_DATA, data,
- length);
+ bufsize =
+ from_backend(pktin.type ==
+ SSH2_MSG_CHANNEL_EXTENDED_DATA,
+ data, length);
break;
case CHAN_X11:
- x11_send(c->u.x11.s, data, length);
+ bufsize = x11_send(c->u.x11.s, data, length);
break;
case CHAN_SOCKDATA:
- pfd_send(c->u.pfd.s, data, length);
+ bufsize = pfd_send(c->u.pfd.s, data, length);
break;
case CHAN_AGENT:
while (length > 0) {
c->u.a.lensofar = 0;
}
}
+ bufsize = 0;
break;
}
/*
- * Enlarge the window again at the remote
- * side, just in case it ever runs down and
- * they fail to send us any more data.
+ * If we are not buffering too much data,
+ * enlarge the window again at the remote side.
*/
- ssh2_pkt_init(SSH2_MSG_CHANNEL_WINDOW_ADJUST);
- ssh2_pkt_adduint32(c->remoteid);
- ssh2_pkt_adduint32(length);
- ssh2_pkt_send();
+ if (bufsize < OUR_V2_WINSIZE)
+ ssh2_set_window(c, OUR_V2_WINSIZE - bufsize);
}
} else if (pktin.type == SSH2_MSG_DISCONNECT) {
ssh_state = SSH_STATE_CLOSED;
break;
}
del234(ssh_channels, c);
- sfree(c->v2.outbuffer);
+ bufchain_clear(&c->v.v2.outbuffer);
sfree(c);
/*
c = find234(ssh_channels, &i, ssh_channelfind);
if (!c)
continue; /* nonexistent channel */
- c->v2.remwindow += ssh2_pkt_getuint32();
+ c->v.v2.remwindow += ssh2_pkt_getuint32();
try_send = TRUE;
} else if (pktin.type == SSH2_MSG_CHANNEL_OPEN_CONFIRMATION) {
unsigned i = ssh2_pkt_getuint32();
c->remoteid = ssh2_pkt_getuint32();
c->type = CHAN_SOCKDATA;
c->closes = 0;
- c->v2.remwindow = ssh2_pkt_getuint32();
- c->v2.remmaxpkt = ssh2_pkt_getuint32();
- c->v2.outbuffer = NULL;
- c->v2.outbuflen = c->v2.outbufsize = 0;
+ c->v.v2.remwindow = ssh2_pkt_getuint32();
+ c->v.v2.remmaxpkt = ssh2_pkt_getuint32();
+ bufchain_init(&c->v.v2.outbuffer);
pfd_confirm(c->u.pfd.s);
} else if (pktin.type == SSH2_MSG_CHANNEL_OPEN) {
char *type;
} else {
c->localid = alloc_channel_id();
c->closes = 0;
- c->v2.remwindow = winsize;
- c->v2.remmaxpkt = pktsize;
- c->v2.outbuffer = NULL;
- c->v2.outbuflen = c->v2.outbufsize = 0;
+ c->v.v2.locwindow = OUR_V2_WINSIZE;
+ c->v.v2.remwindow = winsize;
+ c->v.v2.remmaxpkt = pktsize;
+ bufchain_init(&c->v.v2.outbuffer);
add234(ssh_channels, c);
ssh2_pkt_init(SSH2_MSG_CHANNEL_OPEN_CONFIRMATION);
ssh2_pkt_adduint32(c->remoteid);
ssh2_pkt_adduint32(c->localid);
- ssh2_pkt_adduint32(0x8000UL); /* our window size */
+ ssh2_pkt_adduint32(c->v.v2.locwindow);
ssh2_pkt_adduint32(0x4000UL); /* our max pkt size */
ssh2_pkt_send();
}
/*
* Try to send data on all channels if we can.
*/
- for (i = 0; NULL != (c = index234(ssh_channels, i)); i++)
- ssh2_try_send(c);
+ for (i = 0; NULL != (c = index234(ssh_channels, i)); i++) {
+ int bufsize = ssh2_try_send(c);
+ if (bufsize == 0) {
+ switch (c->type) {
+ case CHAN_MAINSESSION:
+ /* stdin need not receive an unthrottle
+ * notification since it will be polled */
+ break;
+ case CHAN_X11:
+ x11_unthrottle(c->u.x11.s);
+ break;
+ case CHAN_AGENT:
+ /* agent sockets are request/response and need no
+ * buffer management */
+ break;
+ case CHAN_SOCKDATA:
+ pfd_unthrottle(c->u.pfd.s);
+ break;
+ }
+ }
+ }
}
}
ssh_send_ok = 0;
ssh_editing = 0;
ssh_echoing = 0;
+ ssh1_throttle_count = 0;
+ ssh_overall_bufsize = 0;
p = connect_to_host(host, port, realhost);
if (p != NULL)
/*
* Called to send data down the Telnet connection.
*/
-static void ssh_send(char *buf, int len)
+static int ssh_send(char *buf, int len)
{
if (s == NULL || ssh_protocol == NULL)
- return;
+ return 0;
ssh_protocol(buf, len, 0);
+
+ return ssh_sendbuffer();
+}
+
+/*
+ * Called to query the current amount of buffered stdin data.
+ */
+static int ssh_sendbuffer(void)
+{
+ int override_value;
+
+ if (s == NULL || ssh_protocol == NULL)
+ return 0;
+
+ /*
+ * If the SSH socket itself has backed up, add the total backup
+ * size on that to any individual buffer on the stdin channel.
+ */
+ override_value = 0;
+ if (ssh_throttled_all)
+ override_value = ssh_overall_bufsize;
+
+ if (ssh_version == 1) {
+ return override_value;
+ } else if (ssh_version == 2) {
+ if (!mainchan || mainchan->closes > 0)
+ return override_value;
+ else
+ return override_value + bufchain_size(&mainchan->v.v2.outbuffer);
+ }
+
+ return 0;
}
/*
return c;
}
+/*
+ * This is called when stdout/stderr (the entity to which
+ * from_backend sends data) manages to clear some backlog.
+ */
+void ssh_unthrottle(int bufsize)
+{
+ if (ssh_version == 1) {
+ if (bufsize < SSH1_BUFFER_LIMIT) {
+ ssh1_stdout_throttling = 0;
+ ssh1_throttle(-1);
+ }
+ } else {
+ if (mainchan && mainchan->closes == 0)
+ ssh2_set_window(mainchan, OUR_V2_WINSIZE - bufsize);
+ }
+}
+
void ssh_send_port_open(void *channel, char *hostname, int port, char *org)
{
struct ssh_channel *c = (struct ssh_channel *)channel;
ssh2_pkt_init(SSH2_MSG_CHANNEL_OPEN);
ssh2_pkt_addstring("direct-tcpip");
ssh2_pkt_adduint32(c->localid);
- ssh2_pkt_adduint32(0x8000UL); /* our window size */
+ c->v.v2.locwindow = OUR_V2_WINSIZE;
+ ssh2_pkt_adduint32(c->v.v2.locwindow);/* our window size */
ssh2_pkt_adduint32(0x4000UL); /* our max pkt size */
ssh2_pkt_addstring(hostname);
ssh2_pkt_adduint32(port);
Backend ssh_backend = {
ssh_init,
ssh_send,
+ ssh_sendbuffer,
ssh_size,
ssh_special,
ssh_socket,
ssh_sendok,
ssh_ldisc,
+ ssh_unthrottle,
22
};
#include "puttymem.h"
#include "network.h"
+extern void sshfwd_close(struct ssh_channel *c);
+extern int sshfwd_write(struct ssh_channel *c, char *, int);
+extern void sshfwd_unthrottle(struct ssh_channel *c, int bufsize);
+
/*
* Useful thing.
*/
&o_we_sga, &o_they_sga, NULL
};
+#define TELNET_MAX_BACKLOG 4096
+
static int echoing = TRUE, editing = TRUE;
static int activated = FALSE;
-
+static int telnet_bufsize;
static int in_synch;
static int sb_opt, sb_len;
static char *sb_buf = NULL;
static void c_write1(int c)
{
+ int backlog;
char cc = (char) c;
- from_backend(0, &cc, 1);
+ backlog = from_backend(0, &cc, 1);
+ sk_set_frozen(s, backlog > TELNET_MAX_BACKLOG);
}
static void log_option(char *sender, int cmd, int option)
b[0] = IAC;
b[1] = cmd;
b[2] = option;
- sk_write(s, b, 3);
+ telnet_bufsize = sk_write(s, b, 3);
log_option("client", cmd, option);
}
n = 4 + strlen(cfg.termspeed);
b[n] = IAC;
b[n + 1] = SE;
- sk_write(s, b, n + 2);
+ telnet_bufsize = sk_write(s, b, n + 2);
logevent("server:\tSB TSPEED SEND");
sprintf(logbuf, "client:\tSB TSPEED IS %s", cfg.termspeed);
logevent(logbuf);
'a' : cfg.termtype[n]);
b[n + 4] = IAC;
b[n + 5] = SE;
- sk_write(s, b, n + 6);
+ telnet_bufsize = sk_write(s, b, n + 6);
b[n + 4] = 0;
logevent("server:\tSB TTYPE SEND");
sprintf(logbuf, "client:\tSB TTYPE IS %s", b + 4);
}
b[n++] = IAC;
b[n++] = SE;
- sk_write(s, b, n);
+ telnet_bufsize = sk_write(s, b, n);
sprintf(logbuf, "client:\tSB %s IS %s", telopt(sb_opt),
n == 6 ? "<nothing>" : "<stuff>");
logevent(logbuf);
/*
* Called to send data down the Telnet connection.
*/
-static void telnet_send(char *buf, int len)
+static int telnet_send(char *buf, int len)
{
char *p;
static unsigned char iac[2] = { IAC, IAC };
#endif
if (s == NULL)
- return;
+ return 0;
p = buf;
while (p < buf + len) {
while (iswritable((unsigned char) *p) && p < buf + len)
p++;
- sk_write(s, q, p - q);
+ telnet_bufsize = sk_write(s, q, p - q);
while (p < buf + len && !iswritable((unsigned char) *p)) {
- sk_write(s, (unsigned char) *p == IAC ? iac : cr, 2);
+ telnet_bufsize =
+ sk_write(s, (unsigned char) *p == IAC ? iac : cr, 2);
p++;
}
}
+
+ return telnet_bufsize;
+}
+
+/*
+ * Called to query the current socket sendability status.
+ */
+static int telnet_sendbuffer(void)
+{
+ return telnet_bufsize;
}
/*
b[6] = rows & 0xFF;
b[7] = IAC;
b[8] = SE;
- sk_write(s, b, 9);
+ telnet_bufsize = sk_write(s, b, 9);
sprintf(logbuf, "client:\tSB NAWS %d,%d",
((unsigned char) b[3] << 8) + (unsigned char) b[4],
((unsigned char) b[5] << 8) + (unsigned char) b[6]);
switch (code) {
case TS_AYT:
b[1] = AYT;
- sk_write(s, b, 2);
+ telnet_bufsize = sk_write(s, b, 2);
break;
case TS_BRK:
b[1] = BREAK;
- sk_write(s, b, 2);
+ telnet_bufsize = sk_write(s, b, 2);
break;
case TS_EC:
b[1] = EC;
- sk_write(s, b, 2);
+ telnet_bufsize = sk_write(s, b, 2);
break;
case TS_EL:
b[1] = EL;
- sk_write(s, b, 2);
+ telnet_bufsize = sk_write(s, b, 2);
break;
case TS_GA:
b[1] = GA;
- sk_write(s, b, 2);
+ telnet_bufsize = sk_write(s, b, 2);
break;
case TS_NOP:
b[1] = NOP;
- sk_write(s, b, 2);
+ telnet_bufsize = sk_write(s, b, 2);
break;
case TS_ABORT:
b[1] = ABORT;
- sk_write(s, b, 2);
+ telnet_bufsize = sk_write(s, b, 2);
break;
case TS_AO:
b[1] = AO;
- sk_write(s, b, 2);
+ telnet_bufsize = sk_write(s, b, 2);
break;
case TS_IP:
b[1] = IP;
- sk_write(s, b, 2);
+ telnet_bufsize = sk_write(s, b, 2);
break;
case TS_SUSP:
b[1] = SUSP;
- sk_write(s, b, 2);
+ telnet_bufsize = sk_write(s, b, 2);
break;
case TS_EOR:
b[1] = EOR;
- sk_write(s, b, 2);
+ telnet_bufsize = sk_write(s, b, 2);
break;
case TS_EOF:
b[1] = xEOF;
- sk_write(s, b, 2);
+ telnet_bufsize = sk_write(s, b, 2);
break;
case TS_EOL:
- sk_write(s, "\r\n", 2);
+ telnet_bufsize = sk_write(s, "\r\n", 2);
break;
case TS_SYNCH:
b[1] = DM;
- sk_write(s, b, 1);
- sk_write_oob(s, b + 1, 1);
+ telnet_bufsize = sk_write(s, b, 1);
+ telnet_bufsize = sk_write_oob(s, b + 1, 1);
break;
case TS_RECHO:
if (o_echo.state == INACTIVE || o_echo.state == REALLY_INACTIVE) {
case TS_PING:
if (o_they_sga.state == ACTIVE) {
b[1] = NOP;
- sk_write(s, b, 2);
+ telnet_bufsize = sk_write(s, b, 2);
}
break;
}
return 1;
}
+static void telnet_unthrottle(int backlog)
+{
+ sk_set_frozen(s, backlog > TELNET_MAX_BACKLOG);
+}
+
static int telnet_ldisc(int option)
{
if (option == LD_ECHO)
Backend telnet_backend = {
telnet_init,
telnet_send,
+ telnet_sendbuffer,
telnet_size,
telnet_special,
telnet_socket,
telnet_sendok,
telnet_ldisc,
+ telnet_unthrottle,
23
};
/*
* from_backend(), to get data from the backend for the terminal.
*/
-void from_backend(int is_stderr, char *data, int len)
+int from_backend(int is_stderr, char *data, int len)
{
while (len--) {
if (inbuf_head >= INBUF_SIZE)
term_out();
inbuf[inbuf_head++] = *data++;
}
+
+ /*
+ * We process all stdout/stderr data immediately we receive it,
+ * and don't return until it's all gone. Therefore, there's no
+ * reason at all to return anything other than zero from this
+ * function.
+ *
+ * This is a slightly suboptimal way to deal with SSH2 - in
+ * principle, the window mechanism would allow us to continue
+ * to accept data on forwarded ports and X connections even
+ * while the terminal processing was going slowly - but we
+ * can't do the 100% right thing without moving the terminal
+ * processing into a separate thread, and that might hurt
+ * portability. So we manage stdout buffering the old SSH1 way:
+ * if the terminal processing goes slowly, the whole SSH
+ * connection stops accepting data until it's ready.
+ *
+ * In practice, I can't imagine this causing serious trouble.
+ */
+ return 0;
}
/*
len = TranslateKey(message, wParam, lParam, buf);
if (len == -1)
return DefWindowProc(hwnd, message, wParam, lParam);
+
+ /*
+ * We need not bother about stdin backlogs here,
+ * because in GUI PuTTY we can't do anything about
+ * it anyway; there's no means of asking Windows to
+ * hold off on KEYDOWN messages. We _have_ to
+ * buffer everything we're sent.
+ */
ldisc_send(buf, len);
if (len > 0)
luni_send(&keybuf, 1);
} else {
ch = (char) alt_sum;
+ /*
+ * We need not bother about stdin
+ * backlogs here, because in GUI PuTTY
+ * we can't do anything about it
+ * anyway; there's no means of asking
+ * Windows to hold off on KEYDOWN
+ * messages. We _have_ to buffer
+ * everything we're sent.
+ */
ldisc_send(&ch, 1);
}
alt_sum = 0;
#include <windows.h>
#include <stdio.h>
#include <stdlib.h>
+#include <assert.h>
#define DEFINE_PLUG_METHOD_MACROS
#include "putty.h"
#include "network.h"
#include "tree234.h"
-#define BUFFER_GRANULE 512
-
struct Socket_tag {
struct socket_function_table *fn;
/* the above variable absolutely *must* be the first in this structure */
SOCKET s;
Plug plug;
void *private_ptr;
- struct buffer *head, *tail;
+ bufchain output_data;
int writable;
- int frozen; /* this tells the write stuff not to even bother trying to send at this point */
+ int frozen; /* this causes readability notifications to be ignored */
+ int frozen_readable; /* this means we missed at least one readability
+ * notification while we were frozen */
+ char oobdata[1];
int sending_oob;
int oobinline;
};
#endif
};
-struct buffer {
- struct buffer *next;
- int buflen, bufpos;
- char buf[BUFFER_GRANULE];
-};
-
static tree234 *sktree;
static int cmpfortree(void *av, void *bv)
}
static void sk_tcp_close(Socket s);
-static void sk_tcp_write(Socket s, char *data, int len);
-static void sk_tcp_write_oob(Socket s, char *data, int len);
+static int sk_tcp_write(Socket s, char *data, int len);
+static int sk_tcp_write_oob(Socket s, char *data, int len);
static char *sk_tcp_socket_error(Socket s);
extern char *do_select(SOCKET skt, int startup);
ret->fn = &fn_table;
ret->error = NULL;
ret->plug = plug;
- ret->head = ret->tail = NULL;
+ bufchain_init(&ret->output_data);
ret->writable = 1; /* to start with */
ret->sending_oob = 0;
ret->frozen = 1;
+ ret->frozen_readable = 0;
ret->s = (SOCKET)sock;
ret->fn = &fn_table;
ret->error = NULL;
ret->plug = plug;
- ret->head = ret->tail = NULL;
+ bufchain_init(&ret->output_data);
ret->writable = 1; /* to start with */
ret->sending_oob = 0;
ret->frozen = 0;
+ ret->frozen_readable = 0;
/*
* Open socket.
ret->fn = &fn_table;
ret->error = NULL;
ret->plug = plug;
- ret->head = ret->tail = NULL;
+ bufchain_init(&ret->output_data);
ret->writable = 0; /* to start with */
ret->sending_oob = 0;
ret->frozen = 0;
+ ret->frozen_readable = 0;
/*
* Open socket.
*/
void try_send(Actual_Socket s)
{
- if (s->frozen) return;
- while (s->head) {
+ while (s->sending_oob || bufchain_size(&s->output_data) > 0) {
int nsent;
DWORD err;
+ void *data;
int len, urgentflag;
if (s->sending_oob) {
urgentflag = MSG_OOB;
len = s->sending_oob;
+ data = &s->oobdata;
} else {
urgentflag = 0;
- len = s->head->buflen - s->head->bufpos;
+ bufchain_prefix(&s->output_data, &data, &len);
}
- nsent =
- send(s->s, s->head->buf + s->head->bufpos, len, urgentflag);
+ nsent = send(s->s, data, len, urgentflag);
noise_ultralight(nsent);
if (nsent <= 0) {
err = (nsent < 0 ? WSAGetLastError() : 0);
fatalbox(winsock_error_string(err));
}
} else {
- s->head->bufpos += nsent;
- if (s->sending_oob)
- s->sending_oob -= nsent;
- if (s->head->bufpos >= s->head->buflen) {
- struct buffer *tmp = s->head;
- s->head = tmp->next;
- sfree(tmp);
- if (!s->head)
- s->tail = NULL;
+ if (s->sending_oob) {
+ if (nsent < len) {
+ memmove(s->oobdata, s->oobdata+nsent, len-nsent);
+ s->sending_oob = len - nsent;
+ } else {
+ s->sending_oob = 0;
+ }
+ } else {
+ bufchain_consume(&s->output_data, nsent);
}
}
}
}
-static void sk_tcp_write(Socket sock, char *buf, int len)
+static int sk_tcp_write(Socket sock, char *buf, int len)
{
Actual_Socket s = (Actual_Socket) sock;
/*
* Add the data to the buffer list on the socket.
*/
- if (s->tail && s->tail->buflen < BUFFER_GRANULE) {
- int copylen = min(len, BUFFER_GRANULE - s->tail->buflen);
- memcpy(s->tail->buf + s->tail->buflen, buf, copylen);
- buf += copylen;
- len -= copylen;
- s->tail->buflen += copylen;
- }
- while (len > 0) {
- int grainlen = min(len, BUFFER_GRANULE);
- struct buffer *newbuf;
- newbuf = smalloc(sizeof(struct buffer));
- newbuf->bufpos = 0;
- newbuf->buflen = grainlen;
- memcpy(newbuf->buf, buf, grainlen);
- buf += grainlen;
- len -= grainlen;
- if (s->tail)
- s->tail->next = newbuf;
- else
- s->head = s->tail = newbuf;
- newbuf->next = NULL;
- s->tail = newbuf;
- }
+ bufchain_add(&s->output_data, buf, len);
/*
* Now try sending from the start of the buffer list.
*/
if (s->writable)
try_send(s);
+
+ return bufchain_size(&s->output_data);
}
-static void sk_tcp_write_oob(Socket sock, char *buf, int len)
+static int sk_tcp_write_oob(Socket sock, char *buf, int len)
{
Actual_Socket s = (Actual_Socket) sock;
/*
* Replace the buffer list on the socket with the data.
*/
- if (!s->head) {
- s->head = smalloc(sizeof(struct buffer));
- } else {
- struct buffer *walk = s->head->next;
- while (walk) {
- struct buffer *tmp = walk;
- walk = tmp->next;
- sfree(tmp);
- }
- }
- s->head->next = NULL;
- s->tail = s->head;
- s->head->buflen = len;
- memcpy(s->head->buf, buf, len);
-
- /*
- * Set the Urgent marker.
- */
+ bufchain_clear(&s->output_data);
+ assert(len <= sizeof(s->oobdata));
+ memcpy(s->oobdata, buf, len);
s->sending_oob = len;
/*
*/
if (s->writable)
try_send(s);
+
+ return s->sending_oob;
}
int select_result(WPARAM wParam, LPARAM lParam)
switch (WSAGETSELECTEVENT(lParam)) {
case FD_READ:
-
/* In the case the socket is still frozen, we don't even bother */
- if (s->frozen)
+ if (s->frozen) {
+ s->frozen_readable = 1;
break;
+ }
/*
* We have received data on the socket. For an oobinline
}
break;
case FD_WRITE:
- s->writable = 1;
- try_send(s);
+ {
+ int bufsize_before, bufsize_after;
+ s->writable = 1;
+ bufsize_before = s->sending_oob + bufchain_size(&s->output_data);
+ try_send(s);
+ bufsize_after = s->sending_oob + bufchain_size(&s->output_data);
+ if (bufsize_after < bufsize_before)
+ plug_sent(s->plug, bufsize_after);
+ }
break;
case FD_CLOSE:
/* Signal a close on the socket. First read any outstanding data. */
void sk_set_frozen(Socket sock, int is_frozen)
{
Actual_Socket s = (Actual_Socket) sock;
+ if (s->frozen == is_frozen)
+ return;
s->frozen = is_frozen;
- if (!is_frozen) {
+ if (!is_frozen && s->frozen_readable) {
char c;
recv(s->s, &c, 1, MSG_PEEK);
}
+ s->frozen_readable = 0;
}
/*
#define PUT_16BIT(endian, cp, val) \
(endian=='B' ? PUT_16BIT_MSB_FIRST(cp, val) : PUT_16BIT_LSB_FIRST(cp, val))
-extern void sshfwd_close(void *);
-extern void sshfwd_write(void *, char *, int);
-
struct X11Private {
struct plug_function_table *fn;
/* the above variable absolutely *must* be the first in this structure */
unsigned char *auth_data;
int data_read, auth_plen, auth_psize, auth_dlen, auth_dsize;
int verified;
+ int throttled, throttle_override;
void *c; /* data used by ssh.c */
Socket s;
};
{
struct X11Private *pr = (struct X11Private *) plug;
- sshfwd_write(pr->c, data, len);
+ if (sshfwd_write(pr->c, data, len) > 0) {
+ pr->throttled = 1;
+ sk_set_frozen(pr->s, 1);
+ }
+
return 1;
}
+static void x11_sent(Plug plug, int bufsize)
+{
+ struct X11Private *pr = (struct X11Private *) plug;
+
+ sshfwd_unthrottle(pr->c, bufsize);
+}
+
/*
* Called to set up the raw connection.
*
{
static struct plug_function_table fn_table = {
x11_closing,
- x11_receive
+ x11_receive,
+ x11_sent,
+ NULL
};
SockAddr addr;
pr->auth_protocol = NULL;
pr->verified = 0;
pr->data_read = 0;
+ pr->throttled = pr->throttle_override = 0;
pr->c = c;
pr->s = *s = sk_new(addr, port, 0, 1, (Plug) pr);
sk_close(s);
}
+void x11_unthrottle(Socket s)
+{
+ struct X11Private *pr;
+ if (!s)
+ return;
+ pr = (struct X11Private *) sk_get_private_ptr(s);
+
+ pr->throttled = 0;
+ sk_set_frozen(s, pr->throttled || pr->throttle_override);
+}
+
+void x11_override_throttle(Socket s, int enable)
+{
+ struct X11Private *pr;
+ if (!s)
+ return;
+ pr = (struct X11Private *) sk_get_private_ptr(s);
+
+ pr->throttle_override = enable;
+ sk_set_frozen(s, pr->throttled || pr->throttle_override);
+}
+
/*
* Called to send data down the raw connection.
*/
-void x11_send(Socket s, char *data, int len)
+int x11_send(Socket s, char *data, int len)
{
struct X11Private *pr = (struct X11Private *) sk_get_private_ptr(s);
if (s == NULL)
- return;
+ return 0;
/*
* Read the first packet.
while (len > 0 && pr->data_read < 12)
pr->firstpkt[pr->data_read++] = (unsigned char) (len--, *data++);
if (pr->data_read < 12)
- return;
+ return 0;
/*
* If we have not allocated the auth_protocol and auth_data
pr->auth_data[pr->data_read++ - 12 -
pr->auth_psize] = (unsigned char) (len--, *data++);
if (pr->data_read < 12 + pr->auth_psize + pr->auth_dsize)
- return;
+ return 0;
/*
* If we haven't verified the authentication, do so now.
sshfwd_write(pr->c, reply, 8 + msgsize);
sshfwd_close(pr->c);
x11_close(s);
- return;
+ return 0;
}
/*
* After initialisation, just copy data simply.
*/
- sk_write(s, data, len);
+ return sk_write(s, data, len);
}