4 signal
.signal(signal
.SIGINT
, signal
.SIG_DFL
)
9 from zope
.interface
import implementer
12 from twisted
.internet
import reactor
13 import twisted
.internet
.endpoints
15 from twisted
.logger
import LogLevel
16 import twisted
.python
.constants
17 from twisted
.python
.constants
import NamedConstant
20 from ipaddress
import AddressValueError
22 from optparse
import OptionParser
23 from configparser
import ConfigParser
24 from configparser
import NoOptionError
26 from functools
import partial
35 import hippotat
.slip
as slip
37 class DBG(twisted
.python
.constants
.Names
):
38 INIT
= NamedConstant()
39 CONFIG
= NamedConstant()
40 ROUTE
= NamedConstant()
41 DROP
= NamedConstant()
42 FLOW
= NamedConstant()
43 HTTP
= NamedConstant()
44 TWISTED
= NamedConstant()
45 QUEUE
= NamedConstant()
46 HTTP_CTRL
= NamedConstant()
47 QUEUE_CTRL
= NamedConstant()
48 HTTP_FULL
= NamedConstant()
49 CTRL_DUMP
= NamedConstant()
50 SLIP_FULL
= NamedConstant()
51 DATA_COMPLETE
= NamedConstant()
53 _hex_codec
= codecs
.getencoder('hex_codec')
55 #---------- logging ----------
57 org_stderr
= sys
.stderr
59 log
= twisted
.logger
.Logger()
62 debug_def_detail
= DBG
.HTTP
64 def log_debug(dflag
, msg
, idof
=None, d
=None):
65 if dflag
not in debug_set
: return
66 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
68 msg
= '[%#x] %s' %
(id(idof
), msg
)
71 if not DBG
.DATA_COMPLETE
in debug_set
:
75 d
= _hex_codec(d
)[0].decode('ascii')
76 msg
+= ' ' + d
+ trunc
77 log
.info('{dflag} {msgcore}', dflag
=dflag
, msgcore
=msg
)
79 @implementer(twisted
.logger
.ILogFilterPredicate
)
80 class LogNotBoringTwisted
:
81 def __call__(self
, event
):
82 yes
= twisted
.logger
.PredicateResult
.yes
83 no
= twisted
.logger
.PredicateResult
.no
85 if event
.get('log_level') != LogLevel
.info
:
87 dflag
= event
.get('dflag')
88 if dflag
in debug_set
: return yes
89 if dflag
is None and DBG
.TWISTED
in debug_set
: return yes
92 print(traceback
.format_exc(), file=org_stderr
)
95 #---------- default config ----------
100 max_batch_down = 65536 # used by server, subject to [limits]
101 max_queue_time = 10 # used by server, subject to [limits]
102 target_requests_outstanding = 3 # must match; subject to [limits] on server
103 http_timeout = 30 # used by both } must be
104 http_timeout_grace = 5 # used by both } compatible
105 max_requests_outstanding = 4 # used by client
106 max_batch_up = 4000 # used by client
107 http_retry = 5 # used by client
109 #[server] or [<client>] overrides
110 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
111 # extra interpolations: %(local)s %(peer)s %(rnet)s
112 # obtained on server [virtual]server [virtual]relay [virtual]network
113 # from on client <client> [virtual]server [virtual]routes
118 # network = <prefix>/<len> # mandatory for server
119 # server = <ipaddr> # used by both, default is computed from `network'
120 # relay = <ipaddr> # used by server, default from `network' and `server'
121 # default server is first host in network
122 # default relay is first host which is not server
125 # addrs = 127.0.0.1 ::1 # mandatory for server
126 port = 80 # used by server
127 # url # used by client; default from first `addrs' and `port'
129 # [<client-ip4-or-ipv6-address>]
130 # password = <password> # used by both, must match
133 max_batch_down = 262144 # used by server
134 max_queue_time = 121 # used by server
135 http_timeout = 121 # used by server
136 target_requests_outstanding = 10 # used by server
139 # these need to be defined here so that they can be imported by import *
140 cfg
= ConfigParser(strict
=False)
141 optparser
= OptionParser()
143 _mimetrans
= bytes
.maketrans(b
'-'+slip
.esc
, slip
.esc
+b
'-')
144 def mime_translate(s
):
145 # SLIP-encoded packets cannot contain ESC ESC.
146 # Swap `-' and ESC. The result cannot contain `--'
147 return s
.translate(_mimetrans
)
150 def __init__(self
, d
= { }):
153 return 'ConfigResults('+repr(self
.__dict__
)+')'
157 def log_discard(packet
, iface
, saddr
, daddr
, why
):
159 'discarded packet [%s] %s -> %s: %s' %
(iface
, saddr
, daddr
, why
),
162 #---------- packet parsing ----------
164 def packet_addrs(packet
):
165 version
= packet
[0] >> 4
169 factory
= ipaddress
.IPv4Address
173 factory
= ipaddress
.IPv6Address
175 raise ValueError('unsupported IP version %d' % version
)
176 saddr
= factory(packet
[ saddroff
: saddroff
+ addrlen
])
177 daddr
= factory(packet
[ saddroff
+ addrlen
: saddroff
+ addrlen
*2 ])
178 return (saddr
, daddr
)
180 #---------- address handling ----------
184 r
= ipaddress
.IPv4Address(input)
185 except AddressValueError
:
186 r
= ipaddress
.IPv6Address(input)
189 def ipnetwork(input):
191 r
= ipaddress
.IPv4Network(input)
192 except NetworkValueError
:
193 r
= ipaddress
.IPv6Network(input)
196 #---------- ipif (SLIP) subprocess ----------
198 class SlipStreamDecoder():
199 def __init__(self
, desc
, on_packet
):
201 self
._on_packet
= on_packet
203 self
._log('__init__')
205 def _log(self
, msg
, **kwargs
):
206 log_debug(DBG
.SLIP_FULL
, 'slip %s: %s' %
(self
._desc
, msg
), **kwargs
)
208 def inputdata(self
, data
):
209 self
._log('inputdata', d
=data
)
210 packets
= slip
.decode(data
)
211 packets
[0] = self
._buffer
+ packets
[0]
212 self
._buffer
= packets
.pop()
213 for packet
in packets
:
214 self
._maybe_packet(packet
)
215 self
._log('bufremain', d
=self
._buffer
)
217 def _maybe_packet(self
, packet
):
218 self
._log('maybepacket', d
=packet
)
220 self
._on_packet(packet
)
224 self
._maybe_packet(self
._buffer
)
227 class _IpifProcessProtocol(twisted
.internet
.protocol
.ProcessProtocol
):
228 def __init__(self
, router
):
229 self
._router
= router
230 self
._decoder
= SlipStreamDecoder('ipif', self
.slip_on_packet
)
231 def connectionMade(self
): pass
232 def outReceived(self
, data
):
233 self
._decoder
.inputdata(data
)
234 def slip_on_packet(self
, packet
):
235 (saddr
, daddr
) = packet_addrs(packet
)
236 if saddr
.is_link_local
or daddr
.is_link_local
:
237 log_discard(packet
, 'ipif', saddr
, daddr
, 'link-local')
239 self
._router(packet
, saddr
, daddr
)
240 def processEnded(self
, status
):
241 status
.raiseException()
243 def start_ipif(command
, router
):
245 ipif
= _IpifProcessProtocol(router
)
246 reactor
.spawnProcess(ipif
,
247 '/bin/sh',['sh','-xc', command
],
248 childFDs
={0:'w', 1:'r', 2:2},
251 def queue_inbound(packet
):
252 log_debug(DBG
.FLOW
, "queue_inbound", d
=packet
)
253 ipif
.transport
.write(slip
.delimiter
)
254 ipif
.transport
.write(slip
.encode(packet
))
255 ipif
.transport
.write(slip
.delimiter
)
257 #---------- packet queue ----------
260 def __init__(self
, desc
, max_queue_time
):
263 self
._max_queue_time
= max_queue_time
264 self
._pq
= collections
.deque() # packets
266 def _log(self
, dflag
, msg
, **kwargs
):
267 log_debug(dflag
, self
._desc
+' pq: '+msg
, **kwargs
)
269 def append(self
, packet
):
270 self
._log(DBG
.QUEUE
, 'append', d
=packet
)
271 self
._pq
.append((time
.monotonic(), packet
))
274 self
._log(DBG
.QUEUE
, 'nonempty ?')
276 try: (queuetime
, packet
) = self
._pq
[0]
278 self
._log(DBG
.QUEUE
, 'nonempty ? empty.')
281 age
= time
.monotonic() - queuetime
282 if age
> self
._max_queue_time
:
283 # strip old packets off the front
284 self
._log(DBG
.QUEUE
, 'dropping (old)', d
=packet
)
288 self
._log(DBG
.QUEUE
, 'nonempty ? nonempty.')
291 def process(self
, sizequery
, moredata
, max_batch
):
292 # sizequery() should return size of batch so far
293 # moredata(s) should add s to batch
294 self
._log(DBG
.QUEUE
, 'process...')
296 try: (dummy
, packet
) = self
._pq
[0]
298 self
._log(DBG
.QUEUE
, 'process... empty')
301 self
._log(DBG
.QUEUE_CTRL
, 'process... packet', d
=packet
)
303 encoded
= slip
.encode(packet
)
306 self
._log(DBG
.QUEUE_CTRL
,
307 'process... (sofar=%d, max=%d) encoded' %
(sofar
, max_batch
),
311 if sofar
+ len(slip
.delimiter
) + len(encoded
) > max_batch
:
312 self
._log(DBG
.QUEUE_CTRL
, 'process... overflow')
314 moredata(slip
.delimiter
)
319 #---------- error handling ----------
326 print('========== CRASH ==========', err
,
327 '===========================', file=sys
.stderr
)
329 except twisted
.internet
.error
.ReactorNotRunning
: pass
331 def crash_on_defer(defer
):
332 defer
.addErrback(lambda err
: crash(err
))
334 def crash_on_critical(event
):
335 if event
.get('log_level') >= LogLevel
.critical
:
336 crash(twisted
.logger
.formatEvent(event
))
338 #---------- config processing ----------
340 def process_cfg_common_always():
342 c
.mtu
= cfg
.get('virtual','mtu')
344 def process_cfg_ipif(section
, varmap
):
346 try: v
= getattr(c
, s
)
347 except AttributeError: continue
352 c
.ipif_command
= cfg
.get(section
,'ipif', vars=c
.__dict__
)
354 def process_cfg_network():
355 c
.network
= ipnetwork(cfg
.get('virtual','network'))
356 if c
.network
.num_addresses
< 3 + 2:
357 raise ValueError('network needs at least 2^3 addresses')
359 def process_cfg_server():
361 c
.server
= cfg
.get('virtual','server')
362 except NoOptionError
:
363 process_cfg_network()
364 c
.server
= next(c
.network
.hosts())
367 def __init__(self
, port
, addrspec
):
371 self
.addr
= ipaddress
.IPv4Address(addrspec
)
372 self
._endpointfactory
= twisted
.internet
.endpoints
.TCP4ServerEndpoint
374 except AddressValueError
:
375 self
.addr
= ipaddress
.IPv6Address(addrspec
)
376 self
._endpointfactory
= twisted
.internet
.endpoints
.TCP6ServerEndpoint
377 self
._inurl
= b
'[%s]'
378 def make_endpoint(self
):
379 return self
._endpointfactory(reactor
, self
.port
, self
.addr
)
381 url
= b
'http://' + (self
._inurl % str
(self
.addr
).encode('ascii'))
382 if self
.port
!= 80: url
+= b
':%d' % self
.port
386 def process_cfg_saddrs():
387 try: port
= cfg
.getint('server','port')
388 except NoOptionError
: port
= 80
391 for addrspec
in cfg
.get('server','addrs').split():
392 sa
= ServerAddr(port
, addrspec
)
395 def process_cfg_clients(constructor
):
397 for cs
in cfg
.sections():
398 if not (':' in cs
or '.' in cs
): continue
400 pw
= cfg
.get(cs
, 'password')
401 pw
= pw
.encode('utf-8')
402 constructor(ci
,cs
,pw
)
404 #---------- startup ----------
406 def common_startup():
407 re
= regexp
.compile('#.*')
408 cfg
.read_string(re
.sub('', defcfg
))
411 def readconfig(pathname
, mandatory
=True):
412 def log(m
, p
=pathname
):
413 if not DBG
.CONFIG
in debug_set
: return
414 print('DBG.CONFIG: %s: %s' %
(m
, pathname
))
417 files
= os
.listdir(pathname
)
419 except FileNotFoundError
:
424 except NotADirectoryError
:
431 re
= regexp
.compile('[^-A-Za-z0-9_]')
432 for f
in os
.listdir(cdir
):
433 if re
.search(f
): continue
434 subpath
= pathname
+ '/' + f
437 except FileNotFoundError
:
438 log('entry skipped', subpath
)
441 log('entry read', subpath
)
443 def oc_config(od
,os
, value
, op
):
448 def dfs_less_detailed(dl
):
449 return [df
for df
in DBG
.iterconstants() if df
<= dl
]
451 def ds_default(od
,os
,dl
,op
):
453 debug_set
= set(dfs_less_detailed(debug_def_detail
))
455 def ds_select(od
,os
, spec
, op
):
456 for it
in spec
.split(','):
458 if it
.startswith('-'):
459 mutator
= debug_set
.discard
462 mutator
= debug_set
.add
465 dfs
= DBG
.iterconstants()
469 mapper
= dfs_less_detailed
472 mapper
= lambda x
: [x
]
475 dfspec
= DBG
.lookupByName(it
)
477 optparser
.error('unknown debug flag %s in --debug-select' % it
)
484 optparser
.add_option('-D', '--debug',
487 help='enable default debug (to stdout)',
488 callback
= ds_default
)
490 optparser
.add_option('--debug-select',
493 metavar
='[-]DFLAG[+]|[-]+,...',
495 '''enable (`-': disable) each specified DFLAG;
496 `+': do same for all "more interesting" DFLAGSs;
497 just `+': all DFLAGs.
498 DFLAGS: ''' + ' '.join([df
.name
for df
in DBG
.iterconstants()]),
502 optparser
.add_option('-c', '--config',
505 metavar
='CONFIGFILE',
510 (opts
, args
) = optparser
.parse_args()
511 if len(args
): optparser
.error('no non-option arguments please')
514 readconfig('/etc/hippotat/config', False)
515 readconfig('/etc/hippotat/config.d', False)
517 #print(repr(debug_set), file=sys.stderr)
519 log_formatter
= twisted
.logger
.formatEventAsClassicLogText
520 stdout_obs
= twisted
.logger
.FileLogObserver(sys
.stdout
, log_formatter
)
521 stderr_obs
= twisted
.logger
.FileLogObserver(sys
.stderr
, log_formatter
)
522 pred
= twisted
.logger
.LogLevelFilterPredicate(LogLevel
.error
)
523 stdsomething_obs
= twisted
.logger
.FilteringLogObserver(
524 stderr_obs
, [pred
], stdout_obs
526 log_observer
= twisted
.logger
.FilteringLogObserver(
527 stdsomething_obs
, [LogNotBoringTwisted()]
529 #log_observer = stdsomething_obs
530 twisted
.logger
.globalLogBeginner
.beginLoggingTo(
531 [ log_observer
, crash_on_critical
]
535 log_debug(DBG
.INIT
, 'entering reactor')
536 if not _crashing
: reactor
.run()
537 print('CRASHED (end)', file=sys
.stderr
)