fixes
[hippotat] / hippotat / __init__.py
CommitLineData
b0cfbfce
IJ
1# -*- python -*-
2
37ab4cdc
IJ
3import signal
4signal.signal(signal.SIGINT, signal.SIG_DFL)
5
1321ad5f
IJ
6import sys
7
040ff511
IJ
8import twisted
9from twisted.internet import reactor
ae7c7784 10from twisted.logger import LogLevel
1d023c89 11import twisted.internet.endpoints
b0cfbfce
IJ
12
13import ipaddress
14from ipaddress import AddressValueError
15
ae7c7784
IJ
16from optparse import OptionParser
17from configparser import ConfigParser
18from configparser import NoOptionError
19
20import collections
84e763c7 21import time
ae7c7784 22
1321ad5f
IJ
23import re as regexp
24
25import hippotat.slip as slip
26
ca732796
IJ
27defcfg = '''
28[DEFAULT]
29#[<client>] overrides
30max_batch_down = 65536 # used by server, subject to [limits]
31max_queue_time = 10 # used by server, subject to [limits]
32max_request_time = 54 # used by server, subject to [limits]
33target_requests_outstanding = 3 # must match; subject to [limits] on server
34max_requests_outstanding = 4 # used by client
35max_batch_up = 4000 # used by client
7b07f0b5 36http_timeout = 30 # used by client
ca732796
IJ
37
38#[server] or [<client>] overrides
39ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
40# extra interpolations: %(local)s %(peer)s %(rnet)s
41# obtained on server [virtual]server [virtual]relay [virtual]network
42# from on client <client> [virtual]server [virtual]routes
43
44[virtual]
45mtu = 1500
46routes = ''
47# network = <prefix>/<len> # mandatory for server
48# server = <ipaddr> # used by both, default is computed from `network'
49# relay = <ipaddr> # used by server, default from `network' and `server'
50# default server is first host in network
51# default relay is first host which is not server
52
53[server]
54# addrs = 127.0.0.1 ::1 # mandatory for server
55port = 80 # used by server
56# url # used by client; default from first `addrs' and `port'
57
58# [<client-ip4-or-ipv6-address>]
59# password = <password> # used by both, must match
60
61[limits]
62max_batch_down = 262144 # used by server
63max_queue_time = 121 # used by server
64max_request_time = 121 # used by server
65target_requests_outstanding = 10 # used by server
66'''
67
87a7c0c7 68# these need to be defined here so that they can be imported by import *
ae7c7784
IJ
69cfg = ConfigParser()
70optparser = OptionParser()
71
e4006ac4 72_mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
7b07f0b5
IJ
73def mime_translate(s):
74 # SLIP-encoded packets cannot contain ESC ESC.
75 # Swap `-' and ESC. The result cannot contain `--'
76 return s.translate(_mimetrans)
77
87a7c0c7
IJ
78class ConfigResults:
79 def __init__(self, d = { }):
80 self.__dict__ = d
81 def __repr__(self):
82 return 'ConfigResults('+repr(self.__dict__)+')'
83
84c = ConfigResults()
85
1321ad5f
IJ
86def log_discard(packet, saddr, daddr, why):
87 print('DROP ', saddr, daddr, why)
88# syslog.syslog(syslog.LOG_DEBUG,
89# 'discarded packet %s -> %s (%s)' % (saddr, daddr, why))
90
b0cfbfce
IJ
91#---------- packet parsing ----------
92
93def packet_addrs(packet):
94 version = packet[0] >> 4
95 if version == 4:
96 addrlen = 4
97 saddroff = 3*4
98 factory = ipaddress.IPv4Address
99 elif version == 6:
100 addrlen = 16
101 saddroff = 2*4
102 factory = ipaddress.IPv6Address
103 else:
104 raise ValueError('unsupported IP version %d' % version)
105 saddr = factory(packet[ saddroff : saddroff + addrlen ])
106 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
107 return (saddr, daddr)
108
109#---------- address handling ----------
110
111def ipaddr(input):
112 try:
113 r = ipaddress.IPv4Address(input)
114 except AddressValueError:
115 r = ipaddress.IPv6Address(input)
116 return r
117
118def ipnetwork(input):
119 try:
120 r = ipaddress.IPv4Network(input)
121 except NetworkValueError:
122 r = ipaddress.IPv6Network(input)
123 return r
040ff511
IJ
124
125#---------- ipif (SLIP) subprocess ----------
126
a95cfeb2
IJ
127class SlipStreamDecoder():
128 def __init__(self, on_packet):
129 # we will call packet(<packet>)
040ff511 130 self._buffer = b''
a95cfeb2
IJ
131 self._on_packet = on_packet
132
133 def inputdata(self, data):
4f991c0c 134 #print('SLIP-GOT ', repr(data))
040ff511
IJ
135 self._buffer += data
136 packets = slip.decode(self._buffer)
137 self._buffer = packets.pop()
138 for packet in packets:
a95cfeb2
IJ
139 self._maybe_packet(packet)
140
141 def _maybe_packet(self, packet):
142 if len(packet):
143 self._on_packet(packet)
144
4f991c0c 145 def flush(self):
a95cfeb2
IJ
146 self._maybe_packet(self._buffer)
147 self._buffer = b''
4f991c0c 148
e4006ac4 149class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
4f991c0c
IJ
150 def __init__(self, router):
151 self._router = router
a95cfeb2
IJ
152 self._decoder = SlipStreamDecoder(self.slip_on_packet)
153 def connectionMade(self): pass
154 def outReceived(self, data):
155 self._decoder.inputdata(data)
156 def slip_on_packet(self, packet):
4f991c0c
IJ
157 (saddr, daddr) = packet_addrs(packet)
158 if saddr.is_link_local or daddr.is_link_local:
159 log_discard(packet, saddr, daddr, 'link-local')
160 return
161 self._router(packet, saddr, daddr)
040ff511
IJ
162 def processEnded(self, status):
163 status.raiseException()
164
165def start_ipif(command, router):
166 global ipif
167 ipif = _IpifProcessProtocol(router)
168 reactor.spawnProcess(ipif,
169 '/bin/sh',['sh','-xc', command],
ff613365
IJ
170 childFDs={0:'w', 1:'r', 2:2},
171 env=None)
040ff511
IJ
172
173def queue_inbound(packet):
174 ipif.transport.write(slip.delimiter)
175 ipif.transport.write(slip.encode(packet))
176 ipif.transport.write(slip.delimiter)
177
650a3251
IJ
178#---------- packet queue ----------
179
180class PacketQueue():
181 def __init__(self, max_queue_time):
182 self._max_queue_time = max_queue_time
183 self._pq = collections.deque() # packets
184
185 def append(self, packet):
186 self._pq.append((time.monotonic(), packet))
187
188 def nonempty(self):
189 while True:
190 try: (queuetime, packet) = self._pq[0]
191 except IndexError: return False
192
193 age = time.monotonic() - queuetime
84e763c7 194 if age > self._max_queue_time:
650a3251
IJ
195 # strip old packets off the front
196 self._pq.popleft()
197 continue
198
199 return True
200
7b07f0b5
IJ
201 def process(self, sizequery, moredata, max_batch):
202 # sizequery() should return size of batch so far
203 # moredata(s) should add s to batch
204 while True:
205 try: (dummy, packet) = self._pq[0]
206 except IndexError: break
207
208 encoded = slip.encode(packet)
209 sofar = sizequery()
210
211 if sofar > 0:
212 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
213 break
214 moredata(slip.delimiter)
215
216 moredata(encoded)
84e763c7 217 self._pq.popleft()
ae7c7784
IJ
218
219#---------- error handling ----------
220
221def crash(err):
222 print('CRASH ', err, file=sys.stderr)
223 try: reactor.stop()
224 except twisted.internet.error.ReactorNotRunning: pass
225
226def crash_on_defer(defer):
227 defer.addErrback(lambda err: crash(err))
228
e4006ac4 229def crash_on_critical(event):
ae7c7784
IJ
230 if event.get('log_level') >= LogLevel.critical:
231 crash(twisted.logger.formatEvent(event))
232
87a7c0c7
IJ
233#---------- config processing ----------
234
235def process_cfg_common_always():
236 global mtu
237 c.mtu = cfg.get('virtual','mtu')
238
88487243
IJ
239def process_cfg_ipif(section, varmap):
240 for d, s in varmap:
241 try: v = getattr(c, s)
034284c3 242 except AttributeError: continue
88487243
IJ
243 setattr(c, d, v)
244
245 print(repr(c))
246
247 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
248
249def process_cfg_network():
250 c.network = ipnetwork(cfg.get('virtual','network'))
251 if c.network.num_addresses < 3 + 2:
252 raise ValueError('network needs at least 2^3 addresses')
253
254def process_cfg_server():
255 try:
256 c.server = cfg.get('virtual','server')
257 except NoOptionError:
258 process_cfg_network()
259 c.server = next(c.network.hosts())
260
261class ServerAddr():
262 def __init__(self, port, addrspec):
263 self.port = port
264 # also self.addr
265 try:
266 self.addr = ipaddress.IPv4Address(addrspec)
267 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
84e763c7 268 self._inurl = b'%s'
88487243
IJ
269 except AddressValueError:
270 self.addr = ipaddress.IPv6Address(addrspec)
271 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
84e763c7 272 self._inurl = b'[%s]'
88487243
IJ
273 def make_endpoint(self):
274 return self._endpointfactory(reactor, self.port, self.addr)
275 def url(self):
84e763c7
IJ
276 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
277 if self.port != 80: url += b':%d' % self.port
278 url += b'/'
88487243
IJ
279 return url
280
281def process_cfg_saddrs():
1d023c89
IJ
282 try: port = cfg.getint('server','port')
283 except NoOptionError: port = 80
88487243
IJ
284
285 c.saddrs = [ ]
286 for addrspec in cfg.get('server','addrs').split():
287 sa = ServerAddr(port, addrspec)
288 c.saddrs.append(sa)
289
290def process_cfg_clients(constructor):
291 c.clients = [ ]
292 for cs in cfg.sections():
293 if not (':' in cs or '.' in cs): continue
294 ci = ipaddr(cs)
295 pw = cfg.get(cs, 'password')
84e763c7 296 pw = pw.encode('utf-8')
88487243
IJ
297 constructor(ci,cs,pw)
298
ae7c7784
IJ
299#---------- startup ----------
300
1321ad5f 301def common_startup():
ae7c7784
IJ
302 twisted.logger.globalLogPublisher.addObserver(crash_on_critical)
303
304 optparser.add_option('-c', '--config', dest='configfile',
305 default='/etc/hippotat/config')
306 (opts, args) = optparser.parse_args()
307 if len(args): optparser.error('no non-option arguments please')
308
1321ad5f
IJ
309 re = regexp.compile('#.*')
310 cfg.read_string(re.sub('', defcfg))
ae7c7784
IJ
311 cfg.read(opts.configfile)
312
313def common_run():
314 reactor.run()
315 print('CRASHED (end)', file=sys.stderr)