fixes
[hippotat] / hippotat / __init__.py
CommitLineData
b0cfbfce
IJ
1# -*- python -*-
2
37ab4cdc
IJ
3import signal
4signal.signal(signal.SIGINT, signal.SIG_DFL)
5
1321ad5f
IJ
6import sys
7
040ff511
IJ
8import twisted
9from twisted.internet import reactor
ae7c7784 10from twisted.logger import LogLevel
1d023c89 11import twisted.internet.endpoints
b0cfbfce
IJ
12
13import ipaddress
14from ipaddress import AddressValueError
15
ae7c7784
IJ
16from optparse import OptionParser
17from configparser import ConfigParser
18from configparser import NoOptionError
19
20import collections
21
1321ad5f
IJ
22import re as regexp
23
24import hippotat.slip as slip
25
ca732796
IJ
26defcfg = '''
27[DEFAULT]
28#[<client>] overrides
29max_batch_down = 65536 # used by server, subject to [limits]
30max_queue_time = 10 # used by server, subject to [limits]
31max_request_time = 54 # used by server, subject to [limits]
32target_requests_outstanding = 3 # must match; subject to [limits] on server
33max_requests_outstanding = 4 # used by client
34max_batch_up = 4000 # used by client
7b07f0b5 35http_timeout = 30 # used by client
ca732796
IJ
36
37#[server] or [<client>] overrides
38ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
39# extra interpolations: %(local)s %(peer)s %(rnet)s
40# obtained on server [virtual]server [virtual]relay [virtual]network
41# from on client <client> [virtual]server [virtual]routes
42
43[virtual]
44mtu = 1500
45routes = ''
46# network = <prefix>/<len> # mandatory for server
47# server = <ipaddr> # used by both, default is computed from `network'
48# relay = <ipaddr> # used by server, default from `network' and `server'
49# default server is first host in network
50# default relay is first host which is not server
51
52[server]
53# addrs = 127.0.0.1 ::1 # mandatory for server
54port = 80 # used by server
55# url # used by client; default from first `addrs' and `port'
56
57# [<client-ip4-or-ipv6-address>]
58# password = <password> # used by both, must match
59
60[limits]
61max_batch_down = 262144 # used by server
62max_queue_time = 121 # used by server
63max_request_time = 121 # used by server
64target_requests_outstanding = 10 # used by server
65'''
66
87a7c0c7 67# these need to be defined here so that they can be imported by import *
ae7c7784
IJ
68cfg = ConfigParser()
69optparser = OptionParser()
70
e4006ac4 71_mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
7b07f0b5
IJ
72def mime_translate(s):
73 # SLIP-encoded packets cannot contain ESC ESC.
74 # Swap `-' and ESC. The result cannot contain `--'
75 return s.translate(_mimetrans)
76
87a7c0c7
IJ
77class ConfigResults:
78 def __init__(self, d = { }):
79 self.__dict__ = d
80 def __repr__(self):
81 return 'ConfigResults('+repr(self.__dict__)+')'
82
83c = ConfigResults()
84
1321ad5f
IJ
85def log_discard(packet, saddr, daddr, why):
86 print('DROP ', saddr, daddr, why)
87# syslog.syslog(syslog.LOG_DEBUG,
88# 'discarded packet %s -> %s (%s)' % (saddr, daddr, why))
89
b0cfbfce
IJ
90#---------- packet parsing ----------
91
92def packet_addrs(packet):
93 version = packet[0] >> 4
94 if version == 4:
95 addrlen = 4
96 saddroff = 3*4
97 factory = ipaddress.IPv4Address
98 elif version == 6:
99 addrlen = 16
100 saddroff = 2*4
101 factory = ipaddress.IPv6Address
102 else:
103 raise ValueError('unsupported IP version %d' % version)
104 saddr = factory(packet[ saddroff : saddroff + addrlen ])
105 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
106 return (saddr, daddr)
107
108#---------- address handling ----------
109
110def ipaddr(input):
111 try:
112 r = ipaddress.IPv4Address(input)
113 except AddressValueError:
114 r = ipaddress.IPv6Address(input)
115 return r
116
117def ipnetwork(input):
118 try:
119 r = ipaddress.IPv4Network(input)
120 except NetworkValueError:
121 r = ipaddress.IPv6Network(input)
122 return r
040ff511
IJ
123
124#---------- ipif (SLIP) subprocess ----------
125
a95cfeb2
IJ
126class SlipStreamDecoder():
127 def __init__(self, on_packet):
128 # we will call packet(<packet>)
040ff511 129 self._buffer = b''
a95cfeb2
IJ
130 self._on_packet = on_packet
131
132 def inputdata(self, data):
4f991c0c 133 #print('SLIP-GOT ', repr(data))
040ff511
IJ
134 self._buffer += data
135 packets = slip.decode(self._buffer)
136 self._buffer = packets.pop()
137 for packet in packets:
a95cfeb2
IJ
138 self._maybe_packet(packet)
139
140 def _maybe_packet(self, packet):
141 if len(packet):
142 self._on_packet(packet)
143
4f991c0c 144 def flush(self):
a95cfeb2
IJ
145 self._maybe_packet(self._buffer)
146 self._buffer = b''
4f991c0c 147
e4006ac4 148class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
4f991c0c
IJ
149 def __init__(self, router):
150 self._router = router
a95cfeb2
IJ
151 self._decoder = SlipStreamDecoder(self.slip_on_packet)
152 def connectionMade(self): pass
153 def outReceived(self, data):
154 self._decoder.inputdata(data)
155 def slip_on_packet(self, packet):
4f991c0c
IJ
156 (saddr, daddr) = packet_addrs(packet)
157 if saddr.is_link_local or daddr.is_link_local:
158 log_discard(packet, saddr, daddr, 'link-local')
159 return
160 self._router(packet, saddr, daddr)
040ff511
IJ
161 def processEnded(self, status):
162 status.raiseException()
163
164def start_ipif(command, router):
165 global ipif
166 ipif = _IpifProcessProtocol(router)
167 reactor.spawnProcess(ipif,
168 '/bin/sh',['sh','-xc', command],
ff613365
IJ
169 childFDs={0:'w', 1:'r', 2:2},
170 env=None)
040ff511
IJ
171
172def queue_inbound(packet):
173 ipif.transport.write(slip.delimiter)
174 ipif.transport.write(slip.encode(packet))
175 ipif.transport.write(slip.delimiter)
176
650a3251
IJ
177#---------- packet queue ----------
178
179class PacketQueue():
180 def __init__(self, max_queue_time):
181 self._max_queue_time = max_queue_time
182 self._pq = collections.deque() # packets
183
184 def append(self, packet):
185 self._pq.append((time.monotonic(), packet))
186
187 def nonempty(self):
188 while True:
189 try: (queuetime, packet) = self._pq[0]
190 except IndexError: return False
191
192 age = time.monotonic() - queuetime
193 if age > self.max_queue_time:
194 # strip old packets off the front
195 self._pq.popleft()
196 continue
197
198 return True
199
7b07f0b5
IJ
200 def process(self, sizequery, moredata, max_batch):
201 # sizequery() should return size of batch so far
202 # moredata(s) should add s to batch
203 while True:
204 try: (dummy, packet) = self._pq[0]
205 except IndexError: break
206
207 encoded = slip.encode(packet)
208 sofar = sizequery()
209
210 if sofar > 0:
211 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
212 break
213 moredata(slip.delimiter)
214
215 moredata(encoded)
216 self._pq.popLeft()
ae7c7784
IJ
217
218#---------- error handling ----------
219
220def crash(err):
221 print('CRASH ', err, file=sys.stderr)
222 try: reactor.stop()
223 except twisted.internet.error.ReactorNotRunning: pass
224
225def crash_on_defer(defer):
226 defer.addErrback(lambda err: crash(err))
227
e4006ac4 228def crash_on_critical(event):
ae7c7784
IJ
229 if event.get('log_level') >= LogLevel.critical:
230 crash(twisted.logger.formatEvent(event))
231
87a7c0c7
IJ
232#---------- config processing ----------
233
234def process_cfg_common_always():
235 global mtu
236 c.mtu = cfg.get('virtual','mtu')
237
88487243
IJ
238def process_cfg_ipif(section, varmap):
239 for d, s in varmap:
240 try: v = getattr(c, s)
034284c3 241 except AttributeError: continue
88487243
IJ
242 setattr(c, d, v)
243
244 print(repr(c))
245
246 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
247
248def process_cfg_network():
249 c.network = ipnetwork(cfg.get('virtual','network'))
250 if c.network.num_addresses < 3 + 2:
251 raise ValueError('network needs at least 2^3 addresses')
252
253def process_cfg_server():
254 try:
255 c.server = cfg.get('virtual','server')
256 except NoOptionError:
257 process_cfg_network()
258 c.server = next(c.network.hosts())
259
260class ServerAddr():
261 def __init__(self, port, addrspec):
262 self.port = port
263 # also self.addr
264 try:
265 self.addr = ipaddress.IPv4Address(addrspec)
266 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
267 self._inurl = '%s'
268 except AddressValueError:
269 self.addr = ipaddress.IPv6Address(addrspec)
270 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
271 self._inurl = '[%s]'
272 def make_endpoint(self):
273 return self._endpointfactory(reactor, self.port, self.addr)
274 def url(self):
275 url = 'http://' + (self._inurl % self.addr)
276 if self.port != 80: url += ':%d' % self.port
277 url += '/'
278 return url
279
280def process_cfg_saddrs():
1d023c89
IJ
281 try: port = cfg.getint('server','port')
282 except NoOptionError: port = 80
88487243
IJ
283
284 c.saddrs = [ ]
285 for addrspec in cfg.get('server','addrs').split():
286 sa = ServerAddr(port, addrspec)
287 c.saddrs.append(sa)
288
289def process_cfg_clients(constructor):
290 c.clients = [ ]
291 for cs in cfg.sections():
292 if not (':' in cs or '.' in cs): continue
293 ci = ipaddr(cs)
294 pw = cfg.get(cs, 'password')
295 constructor(ci,cs,pw)
296
ae7c7784
IJ
297#---------- startup ----------
298
1321ad5f 299def common_startup():
ae7c7784
IJ
300 twisted.logger.globalLogPublisher.addObserver(crash_on_critical)
301
302 optparser.add_option('-c', '--config', dest='configfile',
303 default='/etc/hippotat/config')
304 (opts, args) = optparser.parse_args()
305 if len(args): optparser.error('no non-option arguments please')
306
1321ad5f
IJ
307 re = regexp.compile('#.*')
308 cfg.read_string(re.sub('', defcfg))
ae7c7784
IJ
309 cfg.read(opts.configfile)
310
311def common_run():
312 reactor.run()
313 print('CRASHED (end)', file=sys.stderr)