client ip address (textual)
password
target_requests_outstanding
- d data (SLIP format)
+ d data (SLIP format, with SLIP_ESC and `-' swapped)
client_cs = cs
c.client = ci
c.max_outstanding = cfg.getint(cs, 'max_requests_outstanding')
+ c.target_outstanding = cfg.getint(cs, 'target_requests_outstanding')
password = pw
def process_cfg():
process_cfg_clients(set_client)
c.routes = cfg.get('virtual','routes')
- c.max_queue_time = cfg.get(client_cs, 'max_queue_time')
+ c.max_queue_time = cfg.getint(client_cs, 'max_queue_time')
+ c.max_batch_up = cfg.getint(client_cs, 'max_batch_up')
process_cfg_ipif(client_cs,
(('local', 'client'),
def start_client():
global queue
+ global agent
queue = PacketQueue(c.max_queue_time)
+ agent = twisted.web.client.Agent(reactor, connectTimeout = c.http_timeout)
def outbound(packet, saddr, daddr):
#print('OUT ', saddr, daddr, repr(packet))
queue.append(packet)
check_outbound()
+def req_ok(data)
+
+def req_err(err):
+ print(err, >>sys.stderr)
+ outstanding--
+
+def req_fin(*args):
+
def check_outbound():
while True:
- if outstanding >= c.max_outstanding: break
- elements = { }
- if not queue.nonempty():
- if outstanding >= c.target_
+ if outstanding >= c.max_outstanding : break
+ if not queue.nonempty() && outstanding >= c.target_outstanding: break
+
+ d = b''
+ queue.process((lambda: len(d)),
+ (lambda s: d += s),
+ c.max_batch_up)
+ assert(len(d))
+
+ crlf = b'\r\n'
+ mime = (b'--b' + crlf +
+ b'Content-Disposition: form-data; name="m"' + crlf +
+ password + crlf +
+ c.client + crlf +
+ c.target_outstanding + crlf +
+ b'--b' + crlf +
+ b'Content-Disposition: form-data; name="d"' + crlf +
+ mime_translate(d) + crlf +
+ b'--b--' + crlf)
- while (outstanding < and
- (queue.notempty() or outstanding < c.
+ hh = { 'User-Agent': ['hippotat'],
+ 'Content-Type': ['multipart/form-data; boundary="b"'] }
+ req = agent.request('POST',
+ c.url,
+ twisted.web.client.Headers(hh))
+ req.addTimeout(c.http_timeout,
+ req.addCallbacks(req_ok, req_err)
+ req.addBoth(req_fin)
+ outstanding++
common_startup()
process_cfg()
+start_client()
start_ipif(c.ipif_command, outbound)
common_run()
target_requests_outstanding = 3 # must match; subject to [limits] on server
max_requests_outstanding = 4 # used by client
max_batch_up = 4000 # used by client
+http_timeout = 30 # used by client
#[server] or [<client>] overrides
ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
cfg = ConfigParser()
optparser = OptionParser()
+_mimetrans = str.maketrans(b'-'+slip.esc, slip.esc+'-')
+def mime_translate(s):
+ # SLIP-encoded packets cannot contain ESC ESC.
+ # Swap `-' and ESC. The result cannot contain `--'
+ return s.translate(_mimetrans)
+
class ConfigResults:
def __init__(self, d = { }):
self.__dict__ = d
return True
- def popleft(self):
- # caller must have checked nonempty
- try: (dummy, packet) = self._pq[0]
- except IndexError: return None
- return packet
+ def process(self, sizequery, moredata, max_batch):
+ # sizequery() should return size of batch so far
+ # moredata(s) should add s to batch
+ while True:
+ try: (dummy, packet) = self._pq[0]
+ except IndexError: break
+
+ encoded = slip.encode(packet)
+ sofar = sizequery()
+
+ if sofar > 0:
+ if sofar + len(slip.delimiter) + len(encoded) > max_batch:
+ break
+ moredata(slip.delimiter)
+
+ moredata(encoded)
+ self._pq.popLeft()
#---------- error handling ----------
def crash_on_defer(defer):
defer.addErrback(lambda err: crash(err))
-def crash_on_critical(event):
+vdef crash_on_critical(event):
if event.get('log_level') >= LogLevel.critical:
crash(twisted.logger.formatEvent(event))
break
# request, and also some non-expired packets
- while True:
- packet = self.pq.popleft()
- if packet is None: break
-
- encoded = slip.encode(packet)
-
- if request.sentLength > 0:
- if (request.sentLength + len(slip.delimiter)
- + len(encoded) > self.max_batch_down):
- break
- request.write(slip.delimiter)
-
- request.write(encoded)
- self._pq.popLeft()
+ self._pq.process((lambda: request.sentLength),
+ request.write,
+ self.max_batch_down)
assert(request.sentLength)
self._rq.popLeft()