wip, before optional args log_xxx
[hippotat] / hippotat / __init__.py
CommitLineData
b0cfbfce
IJ
1# -*- python -*-
2
37ab4cdc
IJ
3import signal
4signal.signal(signal.SIGINT, signal.SIG_DFL)
5
1321ad5f
IJ
6import sys
7
040ff511
IJ
8import twisted
9from twisted.internet import reactor
ae7c7784 10from twisted.logger import LogLevel
1d023c89 11import twisted.internet.endpoints
b0cfbfce
IJ
12
13import ipaddress
14from ipaddress import AddressValueError
15
ae7c7784
IJ
16from optparse import OptionParser
17from configparser import ConfigParser
18from configparser import NoOptionError
19
20import collections
84e763c7 21import time
ae7c7784 22
1321ad5f
IJ
23import re as regexp
24
d579a048
IJ
25from twisted.python.constants import NamedConstant
26
1321ad5f
IJ
27import hippotat.slip as slip
28
d579a048
IJ
29class DBG(twisted.python.constants.Names):
30 ROUTE = NamedConstant()
31 FLOW = NamedConstant()
32 HTTP = NamedConstant()
33 HTTP_CTRL = NamedConstant()
34 INIT = NamedConstant()
35 QUEUE = NamedConstant()
36 QUEUE_CTRL = NamedConstant()
37
ca732796
IJ
38defcfg = '''
39[DEFAULT]
40#[<client>] overrides
41max_batch_down = 65536 # used by server, subject to [limits]
42max_queue_time = 10 # used by server, subject to [limits]
43max_request_time = 54 # used by server, subject to [limits]
44target_requests_outstanding = 3 # must match; subject to [limits] on server
45max_requests_outstanding = 4 # used by client
46max_batch_up = 4000 # used by client
7b07f0b5 47http_timeout = 30 # used by client
4edf77a3 48http_retry = 5 # used by client
ca732796
IJ
49
50#[server] or [<client>] overrides
51ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
52# extra interpolations: %(local)s %(peer)s %(rnet)s
53# obtained on server [virtual]server [virtual]relay [virtual]network
54# from on client <client> [virtual]server [virtual]routes
55
56[virtual]
57mtu = 1500
58routes = ''
59# network = <prefix>/<len> # mandatory for server
60# server = <ipaddr> # used by both, default is computed from `network'
61# relay = <ipaddr> # used by server, default from `network' and `server'
62# default server is first host in network
63# default relay is first host which is not server
64
65[server]
66# addrs = 127.0.0.1 ::1 # mandatory for server
67port = 80 # used by server
68# url # used by client; default from first `addrs' and `port'
69
70# [<client-ip4-or-ipv6-address>]
71# password = <password> # used by both, must match
72
73[limits]
74max_batch_down = 262144 # used by server
75max_queue_time = 121 # used by server
76max_request_time = 121 # used by server
77target_requests_outstanding = 10 # used by server
78'''
79
87a7c0c7 80# these need to be defined here so that they can be imported by import *
ae7c7784
IJ
81cfg = ConfigParser()
82optparser = OptionParser()
83
e4006ac4 84_mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
7b07f0b5
IJ
85def mime_translate(s):
86 # SLIP-encoded packets cannot contain ESC ESC.
87 # Swap `-' and ESC. The result cannot contain `--'
88 return s.translate(_mimetrans)
89
87a7c0c7
IJ
90class ConfigResults:
91 def __init__(self, d = { }):
92 self.__dict__ = d
93 def __repr__(self):
94 return 'ConfigResults('+repr(self.__dict__)+')'
95
96c = ConfigResults()
97
1321ad5f
IJ
98def log_discard(packet, saddr, daddr, why):
99 print('DROP ', saddr, daddr, why)
100# syslog.syslog(syslog.LOG_DEBUG,
101# 'discarded packet %s -> %s (%s)' % (saddr, daddr, why))
102
b0cfbfce
IJ
103#---------- packet parsing ----------
104
105def packet_addrs(packet):
106 version = packet[0] >> 4
107 if version == 4:
108 addrlen = 4
109 saddroff = 3*4
110 factory = ipaddress.IPv4Address
111 elif version == 6:
112 addrlen = 16
113 saddroff = 2*4
114 factory = ipaddress.IPv6Address
115 else:
116 raise ValueError('unsupported IP version %d' % version)
117 saddr = factory(packet[ saddroff : saddroff + addrlen ])
118 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
119 return (saddr, daddr)
120
121#---------- address handling ----------
122
123def ipaddr(input):
124 try:
125 r = ipaddress.IPv4Address(input)
126 except AddressValueError:
127 r = ipaddress.IPv6Address(input)
128 return r
129
130def ipnetwork(input):
131 try:
132 r = ipaddress.IPv4Network(input)
133 except NetworkValueError:
134 r = ipaddress.IPv6Network(input)
135 return r
040ff511
IJ
136
137#---------- ipif (SLIP) subprocess ----------
138
a95cfeb2
IJ
139class SlipStreamDecoder():
140 def __init__(self, on_packet):
141 # we will call packet(<packet>)
040ff511 142 self._buffer = b''
a95cfeb2
IJ
143 self._on_packet = on_packet
144
145 def inputdata(self, data):
4f991c0c 146 #print('SLIP-GOT ', repr(data))
040ff511
IJ
147 self._buffer += data
148 packets = slip.decode(self._buffer)
149 self._buffer = packets.pop()
150 for packet in packets:
a95cfeb2
IJ
151 self._maybe_packet(packet)
152
153 def _maybe_packet(self, packet):
154 if len(packet):
155 self._on_packet(packet)
156
4f991c0c 157 def flush(self):
a95cfeb2
IJ
158 self._maybe_packet(self._buffer)
159 self._buffer = b''
4f991c0c 160
e4006ac4 161class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
4f991c0c
IJ
162 def __init__(self, router):
163 self._router = router
a95cfeb2
IJ
164 self._decoder = SlipStreamDecoder(self.slip_on_packet)
165 def connectionMade(self): pass
166 def outReceived(self, data):
167 self._decoder.inputdata(data)
168 def slip_on_packet(self, packet):
4f991c0c
IJ
169 (saddr, daddr) = packet_addrs(packet)
170 if saddr.is_link_local or daddr.is_link_local:
171 log_discard(packet, saddr, daddr, 'link-local')
172 return
173 self._router(packet, saddr, daddr)
040ff511
IJ
174 def processEnded(self, status):
175 status.raiseException()
176
177def start_ipif(command, router):
178 global ipif
179 ipif = _IpifProcessProtocol(router)
180 reactor.spawnProcess(ipif,
181 '/bin/sh',['sh','-xc', command],
ff613365
IJ
182 childFDs={0:'w', 1:'r', 2:2},
183 env=None)
040ff511
IJ
184
185def queue_inbound(packet):
186 ipif.transport.write(slip.delimiter)
187 ipif.transport.write(slip.encode(packet))
188 ipif.transport.write(slip.delimiter)
189
650a3251
IJ
190#---------- packet queue ----------
191
192class PacketQueue():
d579a048
IJ
193 def __init__(self, desc, max_queue_time):
194 self._desc = desc
650a3251
IJ
195 self._max_queue_time = max_queue_time
196 self._pq = collections.deque() # packets
197
d579a048
IJ
198 def _log_debug(self, fn, pri, msg)
199 log_debug(pri,
200
650a3251 201 def append(self, packet):
d579a048 202 log_data(DBG.QUEUE, packet, 'pq %s: append' % self._desc)
650a3251
IJ
203 self._pq.append((time.monotonic(), packet))
204
205 def nonempty(self):
d579a048 206 log_debug(DBG.QUEUE, 'pq %s: nonempty ?' % self._desc)
650a3251
IJ
207 while True:
208 try: (queuetime, packet) = self._pq[0]
209 except IndexError: return False
210
211 age = time.monotonic() - queuetime
84e763c7 212 if age > self._max_queue_time:
650a3251
IJ
213 # strip old packets off the front
214 self._pq.popleft()
215 continue
216
217 return True
218
7b07f0b5
IJ
219 def process(self, sizequery, moredata, max_batch):
220 # sizequery() should return size of batch so far
221 # moredata(s) should add s to batch
222 while True:
223 try: (dummy, packet) = self._pq[0]
224 except IndexError: break
225
226 encoded = slip.encode(packet)
227 sofar = sizequery()
228
229 if sofar > 0:
230 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
231 break
232 moredata(slip.delimiter)
233
234 moredata(encoded)
84e763c7 235 self._pq.popleft()
ae7c7784
IJ
236
237#---------- error handling ----------
238
239def crash(err):
240 print('CRASH ', err, file=sys.stderr)
241 try: reactor.stop()
242 except twisted.internet.error.ReactorNotRunning: pass
243
244def crash_on_defer(defer):
245 defer.addErrback(lambda err: crash(err))
246
e4006ac4 247def crash_on_critical(event):
ae7c7784
IJ
248 if event.get('log_level') >= LogLevel.critical:
249 crash(twisted.logger.formatEvent(event))
250
87a7c0c7
IJ
251#---------- config processing ----------
252
253def process_cfg_common_always():
254 global mtu
255 c.mtu = cfg.get('virtual','mtu')
256
88487243
IJ
257def process_cfg_ipif(section, varmap):
258 for d, s in varmap:
259 try: v = getattr(c, s)
034284c3 260 except AttributeError: continue
88487243
IJ
261 setattr(c, d, v)
262
263 print(repr(c))
264
265 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
266
267def process_cfg_network():
268 c.network = ipnetwork(cfg.get('virtual','network'))
269 if c.network.num_addresses < 3 + 2:
270 raise ValueError('network needs at least 2^3 addresses')
271
272def process_cfg_server():
273 try:
274 c.server = cfg.get('virtual','server')
275 except NoOptionError:
276 process_cfg_network()
277 c.server = next(c.network.hosts())
278
279class ServerAddr():
280 def __init__(self, port, addrspec):
281 self.port = port
282 # also self.addr
283 try:
284 self.addr = ipaddress.IPv4Address(addrspec)
285 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
84e763c7 286 self._inurl = b'%s'
88487243
IJ
287 except AddressValueError:
288 self.addr = ipaddress.IPv6Address(addrspec)
289 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
84e763c7 290 self._inurl = b'[%s]'
88487243
IJ
291 def make_endpoint(self):
292 return self._endpointfactory(reactor, self.port, self.addr)
293 def url(self):
84e763c7
IJ
294 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
295 if self.port != 80: url += b':%d' % self.port
296 url += b'/'
88487243
IJ
297 return url
298
299def process_cfg_saddrs():
1d023c89
IJ
300 try: port = cfg.getint('server','port')
301 except NoOptionError: port = 80
88487243
IJ
302
303 c.saddrs = [ ]
304 for addrspec in cfg.get('server','addrs').split():
305 sa = ServerAddr(port, addrspec)
306 c.saddrs.append(sa)
307
308def process_cfg_clients(constructor):
309 c.clients = [ ]
310 for cs in cfg.sections():
311 if not (':' in cs or '.' in cs): continue
312 ci = ipaddr(cs)
313 pw = cfg.get(cs, 'password')
84e763c7 314 pw = pw.encode('utf-8')
88487243
IJ
315 constructor(ci,cs,pw)
316
ae7c7784
IJ
317#---------- startup ----------
318
1321ad5f 319def common_startup():
ae7c7784
IJ
320 twisted.logger.globalLogPublisher.addObserver(crash_on_critical)
321
322 optparser.add_option('-c', '--config', dest='configfile',
323 default='/etc/hippotat/config')
324 (opts, args) = optparser.parse_args()
325 if len(args): optparser.error('no non-option arguments please')
326
1321ad5f
IJ
327 re = regexp.compile('#.*')
328 cfg.read_string(re.sub('', defcfg))
ae7c7784
IJ
329 cfg.read(opts.configfile)
330
331def common_run():
332 reactor.run()
333 print('CRASHED (end)', file=sys.stderr)