wip, new logging in server
[hippotat] / hippotat / __init__.py
CommitLineData
b0cfbfce
IJ
1# -*- python -*-
2
37ab4cdc
IJ
3import signal
4signal.signal(signal.SIGINT, signal.SIG_DFL)
5
1321ad5f
IJ
6import sys
7
040ff511
IJ
8import twisted
9from twisted.internet import reactor
1d023c89 10import twisted.internet.endpoints
8c3b6620
IJ
11import twisted.logger
12from twisted.logger import LogLevel
13import twisted.python.constants
14from twisted.python.constants import NamedConstant
b0cfbfce
IJ
15
16import ipaddress
17from ipaddress import AddressValueError
18
ae7c7784
IJ
19from optparse import OptionParser
20from configparser import ConfigParser
21from configparser import NoOptionError
22
23import collections
84e763c7 24import time
8c3b6620 25import codecs
ae7c7784 26
1321ad5f
IJ
27import re as regexp
28
29import hippotat.slip as slip
30
d579a048
IJ
31class DBG(twisted.python.constants.Names):
32 ROUTE = NamedConstant()
b68c0739 33 DROP = NamedConstant()
d579a048
IJ
34 FLOW = NamedConstant()
35 HTTP = NamedConstant()
36 HTTP_CTRL = NamedConstant()
37 INIT = NamedConstant()
38 QUEUE = NamedConstant()
39 QUEUE_CTRL = NamedConstant()
40
b68c0739 41_hex_codec = codecs.getencoder('hex_codec')
8c3b6620
IJ
42
43log = twisted.logger.Logger()
44
45def log_debug(dflag, msg, idof=None, d=None):
46 if idof is not None:
47 msg = '[%d] %s' % (id(idof), msg)
48 if d is not None:
49 d = d[0:64]
b68c0739 50 d = _hex_codec(d)[0].decode('ascii')
8c3b6620
IJ
51 msg += ' ' + d
52 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
53
ca732796
IJ
54defcfg = '''
55[DEFAULT]
56#[<client>] overrides
57max_batch_down = 65536 # used by server, subject to [limits]
58max_queue_time = 10 # used by server, subject to [limits]
59max_request_time = 54 # used by server, subject to [limits]
60target_requests_outstanding = 3 # must match; subject to [limits] on server
61max_requests_outstanding = 4 # used by client
62max_batch_up = 4000 # used by client
7b07f0b5 63http_timeout = 30 # used by client
4edf77a3 64http_retry = 5 # used by client
ca732796
IJ
65
66#[server] or [<client>] overrides
67ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
68# extra interpolations: %(local)s %(peer)s %(rnet)s
69# obtained on server [virtual]server [virtual]relay [virtual]network
70# from on client <client> [virtual]server [virtual]routes
71
72[virtual]
73mtu = 1500
74routes = ''
75# network = <prefix>/<len> # mandatory for server
76# server = <ipaddr> # used by both, default is computed from `network'
77# relay = <ipaddr> # used by server, default from `network' and `server'
78# default server is first host in network
79# default relay is first host which is not server
80
81[server]
82# addrs = 127.0.0.1 ::1 # mandatory for server
83port = 80 # used by server
84# url # used by client; default from first `addrs' and `port'
85
86# [<client-ip4-or-ipv6-address>]
87# password = <password> # used by both, must match
88
89[limits]
90max_batch_down = 262144 # used by server
91max_queue_time = 121 # used by server
92max_request_time = 121 # used by server
93target_requests_outstanding = 10 # used by server
94'''
95
87a7c0c7 96# these need to be defined here so that they can be imported by import *
ae7c7784
IJ
97cfg = ConfigParser()
98optparser = OptionParser()
99
e4006ac4 100_mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
7b07f0b5
IJ
101def mime_translate(s):
102 # SLIP-encoded packets cannot contain ESC ESC.
103 # Swap `-' and ESC. The result cannot contain `--'
104 return s.translate(_mimetrans)
105
87a7c0c7
IJ
106class ConfigResults:
107 def __init__(self, d = { }):
108 self.__dict__ = d
109 def __repr__(self):
110 return 'ConfigResults('+repr(self.__dict__)+')'
111
112c = ConfigResults()
113
1321ad5f 114def log_discard(packet, saddr, daddr, why):
b68c0739
IJ
115 log_debug(DBG.DROP,
116 'discarded packet %s -> %s (%s)' % (saddr, daddr, why),
117 d=packet)
1321ad5f 118
b0cfbfce
IJ
119#---------- packet parsing ----------
120
121def packet_addrs(packet):
122 version = packet[0] >> 4
123 if version == 4:
124 addrlen = 4
125 saddroff = 3*4
126 factory = ipaddress.IPv4Address
127 elif version == 6:
128 addrlen = 16
129 saddroff = 2*4
130 factory = ipaddress.IPv6Address
131 else:
132 raise ValueError('unsupported IP version %d' % version)
133 saddr = factory(packet[ saddroff : saddroff + addrlen ])
134 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
135 return (saddr, daddr)
136
137#---------- address handling ----------
138
139def ipaddr(input):
140 try:
141 r = ipaddress.IPv4Address(input)
142 except AddressValueError:
143 r = ipaddress.IPv6Address(input)
144 return r
145
146def ipnetwork(input):
147 try:
148 r = ipaddress.IPv4Network(input)
149 except NetworkValueError:
150 r = ipaddress.IPv6Network(input)
151 return r
040ff511
IJ
152
153#---------- ipif (SLIP) subprocess ----------
154
a95cfeb2
IJ
155class SlipStreamDecoder():
156 def __init__(self, on_packet):
157 # we will call packet(<packet>)
040ff511 158 self._buffer = b''
a95cfeb2
IJ
159 self._on_packet = on_packet
160
161 def inputdata(self, data):
4f991c0c 162 #print('SLIP-GOT ', repr(data))
040ff511
IJ
163 self._buffer += data
164 packets = slip.decode(self._buffer)
165 self._buffer = packets.pop()
166 for packet in packets:
a95cfeb2
IJ
167 self._maybe_packet(packet)
168
169 def _maybe_packet(self, packet):
170 if len(packet):
171 self._on_packet(packet)
172
4f991c0c 173 def flush(self):
a95cfeb2
IJ
174 self._maybe_packet(self._buffer)
175 self._buffer = b''
4f991c0c 176
e4006ac4 177class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
4f991c0c
IJ
178 def __init__(self, router):
179 self._router = router
a95cfeb2
IJ
180 self._decoder = SlipStreamDecoder(self.slip_on_packet)
181 def connectionMade(self): pass
182 def outReceived(self, data):
183 self._decoder.inputdata(data)
184 def slip_on_packet(self, packet):
4f991c0c
IJ
185 (saddr, daddr) = packet_addrs(packet)
186 if saddr.is_link_local or daddr.is_link_local:
187 log_discard(packet, saddr, daddr, 'link-local')
188 return
189 self._router(packet, saddr, daddr)
040ff511
IJ
190 def processEnded(self, status):
191 status.raiseException()
192
193def start_ipif(command, router):
194 global ipif
195 ipif = _IpifProcessProtocol(router)
196 reactor.spawnProcess(ipif,
197 '/bin/sh',['sh','-xc', command],
ff613365
IJ
198 childFDs={0:'w', 1:'r', 2:2},
199 env=None)
040ff511
IJ
200
201def queue_inbound(packet):
202 ipif.transport.write(slip.delimiter)
203 ipif.transport.write(slip.encode(packet))
204 ipif.transport.write(slip.delimiter)
205
650a3251
IJ
206#---------- packet queue ----------
207
208class PacketQueue():
d579a048
IJ
209 def __init__(self, desc, max_queue_time):
210 self._desc = desc
650a3251
IJ
211 self._max_queue_time = max_queue_time
212 self._pq = collections.deque() # packets
213
b68c0739 214 def _log(self, dflag, msg, **kwargs):
8c3b6620 215 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
d579a048 216
650a3251 217 def append(self, packet):
8c3b6620 218 self._log(DBG.QUEUE, 'append', d=packet)
650a3251
IJ
219 self._pq.append((time.monotonic(), packet))
220
221 def nonempty(self):
8c3b6620 222 self._log(DBG.QUEUE, 'nonempty ?')
650a3251
IJ
223 while True:
224 try: (queuetime, packet) = self._pq[0]
8c3b6620
IJ
225 except IndexError:
226 self._log(DBG.QUEUE, 'nonempty ? empty.')
227 return False
650a3251
IJ
228
229 age = time.monotonic() - queuetime
84e763c7 230 if age > self._max_queue_time:
650a3251 231 # strip old packets off the front
8c3b6620 232 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
650a3251
IJ
233 self._pq.popleft()
234 continue
235
8c3b6620 236 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
650a3251
IJ
237 return True
238
7b07f0b5
IJ
239 def process(self, sizequery, moredata, max_batch):
240 # sizequery() should return size of batch so far
241 # moredata(s) should add s to batch
8c3b6620 242 self._log(DBG.QUEUE, 'process...')
7b07f0b5
IJ
243 while True:
244 try: (dummy, packet) = self._pq[0]
8c3b6620
IJ
245 except IndexError:
246 self._log(DBG.QUEUE, 'process... empty')
247 break
248
249 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
7b07f0b5
IJ
250
251 encoded = slip.encode(packet)
252 sofar = sizequery()
253
8c3b6620
IJ
254 self._log(DBG.QUEUE_CTRL,
255 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
b68c0739 256 d=encoded)
8c3b6620 257
7b07f0b5
IJ
258 if sofar > 0:
259 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
8c3b6620 260 self._log(DBG.QUEUE_CTRL, 'process... overflow')
7b07f0b5
IJ
261 break
262 moredata(slip.delimiter)
263
264 moredata(encoded)
84e763c7 265 self._pq.popleft()
ae7c7784
IJ
266
267#---------- error handling ----------
268
b68c0739
IJ
269_crashing = False
270
ae7c7784 271def crash(err):
b68c0739
IJ
272 global _crashing
273 _crashing = True
ae7c7784
IJ
274 print('CRASH ', err, file=sys.stderr)
275 try: reactor.stop()
276 except twisted.internet.error.ReactorNotRunning: pass
277
278def crash_on_defer(defer):
279 defer.addErrback(lambda err: crash(err))
280
e4006ac4 281def crash_on_critical(event):
ae7c7784
IJ
282 if event.get('log_level') >= LogLevel.critical:
283 crash(twisted.logger.formatEvent(event))
284
87a7c0c7
IJ
285#---------- config processing ----------
286
287def process_cfg_common_always():
288 global mtu
289 c.mtu = cfg.get('virtual','mtu')
290
88487243
IJ
291def process_cfg_ipif(section, varmap):
292 for d, s in varmap:
293 try: v = getattr(c, s)
034284c3 294 except AttributeError: continue
88487243
IJ
295 setattr(c, d, v)
296
b68c0739 297 #print(repr(c))
88487243
IJ
298
299 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
300
301def process_cfg_network():
302 c.network = ipnetwork(cfg.get('virtual','network'))
303 if c.network.num_addresses < 3 + 2:
304 raise ValueError('network needs at least 2^3 addresses')
305
306def process_cfg_server():
307 try:
308 c.server = cfg.get('virtual','server')
309 except NoOptionError:
310 process_cfg_network()
311 c.server = next(c.network.hosts())
312
313class ServerAddr():
314 def __init__(self, port, addrspec):
315 self.port = port
316 # also self.addr
317 try:
318 self.addr = ipaddress.IPv4Address(addrspec)
319 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
84e763c7 320 self._inurl = b'%s'
88487243
IJ
321 except AddressValueError:
322 self.addr = ipaddress.IPv6Address(addrspec)
323 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
84e763c7 324 self._inurl = b'[%s]'
88487243
IJ
325 def make_endpoint(self):
326 return self._endpointfactory(reactor, self.port, self.addr)
327 def url(self):
84e763c7
IJ
328 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
329 if self.port != 80: url += b':%d' % self.port
330 url += b'/'
88487243
IJ
331 return url
332
333def process_cfg_saddrs():
1d023c89
IJ
334 try: port = cfg.getint('server','port')
335 except NoOptionError: port = 80
88487243
IJ
336
337 c.saddrs = [ ]
338 for addrspec in cfg.get('server','addrs').split():
339 sa = ServerAddr(port, addrspec)
340 c.saddrs.append(sa)
341
342def process_cfg_clients(constructor):
343 c.clients = [ ]
344 for cs in cfg.sections():
345 if not (':' in cs or '.' in cs): continue
346 ci = ipaddr(cs)
347 pw = cfg.get(cs, 'password')
84e763c7 348 pw = pw.encode('utf-8')
88487243
IJ
349 constructor(ci,cs,pw)
350
ae7c7784
IJ
351#---------- startup ----------
352
1321ad5f 353def common_startup():
8c3b6620
IJ
354 log_formatter = twisted.logger.formatEventAsClassicLogText
355 log_observer = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
356 twisted.logger.globalLogBeginner.beginLoggingTo(
357 [ log_observer, crash_on_critical ]
358 )
ae7c7784
IJ
359
360 optparser.add_option('-c', '--config', dest='configfile',
361 default='/etc/hippotat/config')
362 (opts, args) = optparser.parse_args()
363 if len(args): optparser.error('no non-option arguments please')
364
1321ad5f
IJ
365 re = regexp.compile('#.*')
366 cfg.read_string(re.sub('', defcfg))
ae7c7784
IJ
367 cfg.read(opts.configfile)
368
369def common_run():
b68c0739
IJ
370 log_debug(DBG.INIT, 'entering reactor')
371 if not _crashing: reactor.run()
ae7c7784 372 print('CRASHED (end)', file=sys.stderr)