wip
[hippotat] / hippotat / __init__.py
CommitLineData
b0cfbfce
IJ
1# -*- python -*-
2
37ab4cdc
IJ
3import signal
4signal.signal(signal.SIGINT, signal.SIG_DFL)
5
1321ad5f
IJ
6import sys
7
040ff511
IJ
8import twisted
9from twisted.internet import reactor
ae7c7784 10from twisted.logger import LogLevel
1d023c89 11import twisted.internet.endpoints
b0cfbfce
IJ
12
13import ipaddress
14from ipaddress import AddressValueError
15
ae7c7784
IJ
16from optparse import OptionParser
17from configparser import ConfigParser
18from configparser import NoOptionError
19
20import collections
21
1321ad5f
IJ
22import re as regexp
23
24import hippotat.slip as slip
25
ca732796
IJ
26defcfg = '''
27[DEFAULT]
28#[<client>] overrides
29max_batch_down = 65536 # used by server, subject to [limits]
30max_queue_time = 10 # used by server, subject to [limits]
31max_request_time = 54 # used by server, subject to [limits]
32target_requests_outstanding = 3 # must match; subject to [limits] on server
33max_requests_outstanding = 4 # used by client
34max_batch_up = 4000 # used by client
35
36#[server] or [<client>] overrides
37ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
38# extra interpolations: %(local)s %(peer)s %(rnet)s
39# obtained on server [virtual]server [virtual]relay [virtual]network
40# from on client <client> [virtual]server [virtual]routes
41
42[virtual]
43mtu = 1500
44routes = ''
45# network = <prefix>/<len> # mandatory for server
46# server = <ipaddr> # used by both, default is computed from `network'
47# relay = <ipaddr> # used by server, default from `network' and `server'
48# default server is first host in network
49# default relay is first host which is not server
50
51[server]
52# addrs = 127.0.0.1 ::1 # mandatory for server
53port = 80 # used by server
54# url # used by client; default from first `addrs' and `port'
55
56# [<client-ip4-or-ipv6-address>]
57# password = <password> # used by both, must match
58
59[limits]
60max_batch_down = 262144 # used by server
61max_queue_time = 121 # used by server
62max_request_time = 121 # used by server
63target_requests_outstanding = 10 # used by server
64'''
65
87a7c0c7 66# these need to be defined here so that they can be imported by import *
ae7c7784
IJ
67cfg = ConfigParser()
68optparser = OptionParser()
69
87a7c0c7
IJ
70class ConfigResults:
71 def __init__(self, d = { }):
72 self.__dict__ = d
73 def __repr__(self):
74 return 'ConfigResults('+repr(self.__dict__)+')'
75
76c = ConfigResults()
77
1321ad5f
IJ
78def log_discard(packet, saddr, daddr, why):
79 print('DROP ', saddr, daddr, why)
80# syslog.syslog(syslog.LOG_DEBUG,
81# 'discarded packet %s -> %s (%s)' % (saddr, daddr, why))
82
b0cfbfce
IJ
83#---------- packet parsing ----------
84
85def packet_addrs(packet):
86 version = packet[0] >> 4
87 if version == 4:
88 addrlen = 4
89 saddroff = 3*4
90 factory = ipaddress.IPv4Address
91 elif version == 6:
92 addrlen = 16
93 saddroff = 2*4
94 factory = ipaddress.IPv6Address
95 else:
96 raise ValueError('unsupported IP version %d' % version)
97 saddr = factory(packet[ saddroff : saddroff + addrlen ])
98 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
99 return (saddr, daddr)
100
101#---------- address handling ----------
102
103def ipaddr(input):
104 try:
105 r = ipaddress.IPv4Address(input)
106 except AddressValueError:
107 r = ipaddress.IPv6Address(input)
108 return r
109
110def ipnetwork(input):
111 try:
112 r = ipaddress.IPv4Network(input)
113 except NetworkValueError:
114 r = ipaddress.IPv6Network(input)
115 return r
040ff511
IJ
116
117#---------- ipif (SLIP) subprocess ----------
118
119class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
120 def __init__(self, router):
121 self._buffer = b''
122 self._router = router
123 def connectionMade(self): pass
124 def outReceived(self, data):
034284c3 125 #print('IPIF-GOT ', repr(data))
040ff511
IJ
126 self._buffer += data
127 packets = slip.decode(self._buffer)
128 self._buffer = packets.pop()
129 for packet in packets:
130 if not len(packet): continue
131 (saddr, daddr) = packet_addrs(packet)
ca732796
IJ
132 if saddr.is_link_local or daddr.is_link_local:
133 log_discard(packet, saddr, daddr, 'link-local')
134 continue
040ff511
IJ
135 self._router(packet, saddr, daddr)
136 def processEnded(self, status):
137 status.raiseException()
138
139def start_ipif(command, router):
140 global ipif
141 ipif = _IpifProcessProtocol(router)
142 reactor.spawnProcess(ipif,
143 '/bin/sh',['sh','-xc', command],
144 childFDs={0:'w', 1:'r', 2:2})
145
146def queue_inbound(packet):
147 ipif.transport.write(slip.delimiter)
148 ipif.transport.write(slip.encode(packet))
149 ipif.transport.write(slip.delimiter)
150
650a3251
IJ
151#---------- packet queue ----------
152
153class PacketQueue():
154 def __init__(self, max_queue_time):
155 self._max_queue_time = max_queue_time
156 self._pq = collections.deque() # packets
157
158 def append(self, packet):
159 self._pq.append((time.monotonic(), packet))
160
161 def nonempty(self):
162 while True:
163 try: (queuetime, packet) = self._pq[0]
164 except IndexError: return False
165
166 age = time.monotonic() - queuetime
167 if age > self.max_queue_time:
168 # strip old packets off the front
169 self._pq.popleft()
170 continue
171
172 return True
173
174 def popleft(self):
175 # caller must have checked nonempty
176 try: (dummy, packet) = self._pq[0]
177 except IndexError: return None
178 return packet
ae7c7784
IJ
179
180#---------- error handling ----------
181
182def crash(err):
183 print('CRASH ', err, file=sys.stderr)
184 try: reactor.stop()
185 except twisted.internet.error.ReactorNotRunning: pass
186
187def crash_on_defer(defer):
188 defer.addErrback(lambda err: crash(err))
189
190def crash_on_critical(event):
191 if event.get('log_level') >= LogLevel.critical:
192 crash(twisted.logger.formatEvent(event))
193
87a7c0c7
IJ
194#---------- config processing ----------
195
196def process_cfg_common_always():
197 global mtu
198 c.mtu = cfg.get('virtual','mtu')
199
88487243
IJ
200def process_cfg_ipif(section, varmap):
201 for d, s in varmap:
202 try: v = getattr(c, s)
034284c3 203 except AttributeError: continue
88487243
IJ
204 setattr(c, d, v)
205
206 print(repr(c))
207
208 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
209
210def process_cfg_network():
211 c.network = ipnetwork(cfg.get('virtual','network'))
212 if c.network.num_addresses < 3 + 2:
213 raise ValueError('network needs at least 2^3 addresses')
214
215def process_cfg_server():
216 try:
217 c.server = cfg.get('virtual','server')
218 except NoOptionError:
219 process_cfg_network()
220 c.server = next(c.network.hosts())
221
222class ServerAddr():
223 def __init__(self, port, addrspec):
224 self.port = port
225 # also self.addr
226 try:
227 self.addr = ipaddress.IPv4Address(addrspec)
228 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
229 self._inurl = '%s'
230 except AddressValueError:
231 self.addr = ipaddress.IPv6Address(addrspec)
232 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
233 self._inurl = '[%s]'
234 def make_endpoint(self):
235 return self._endpointfactory(reactor, self.port, self.addr)
236 def url(self):
237 url = 'http://' + (self._inurl % self.addr)
238 if self.port != 80: url += ':%d' % self.port
239 url += '/'
240 return url
241
242def process_cfg_saddrs():
1d023c89
IJ
243 try: port = cfg.getint('server','port')
244 except NoOptionError: port = 80
88487243
IJ
245
246 c.saddrs = [ ]
247 for addrspec in cfg.get('server','addrs').split():
248 sa = ServerAddr(port, addrspec)
249 c.saddrs.append(sa)
250
251def process_cfg_clients(constructor):
252 c.clients = [ ]
253 for cs in cfg.sections():
254 if not (':' in cs or '.' in cs): continue
255 ci = ipaddr(cs)
256 pw = cfg.get(cs, 'password')
257 constructor(ci,cs,pw)
258
ae7c7784
IJ
259#---------- startup ----------
260
1321ad5f 261def common_startup():
ae7c7784
IJ
262 twisted.logger.globalLogPublisher.addObserver(crash_on_critical)
263
264 optparser.add_option('-c', '--config', dest='configfile',
265 default='/etc/hippotat/config')
266 (opts, args) = optparser.parse_args()
267 if len(args): optparser.error('no non-option arguments please')
268
1321ad5f
IJ
269 re = regexp.compile('#.*')
270 cfg.read_string(re.sub('', defcfg))
ae7c7784
IJ
271 cfg.read(opts.configfile)
272
273def common_run():
274 reactor.run()
275 print('CRASHED (end)', file=sys.stderr)