bd5d2c3cf58262989707c279c00691e5a969a29b
[hippotat] / hippotatlib / __init__.py
1 # -*- python -*-
2 #
3 # Hippotat - Asinine IP Over HTTP program
4 # hippotatlib/__init__.py - common library code
5 #
6 # Copyright 2017 Ian Jackson
7 #
8 # GPLv3+
9 #
10 # This program is free software: you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation, either version 3 of the License, or
13 # (at your option) any later version.
14 #
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 # GNU General Public License for more details.
19 #
20 # You should have received a copy of the GNU General Public License
21 # along with this program, in the file GPLv3. If not,
22 # see <http://www.gnu.org/licenses/>.
23
24
25 import signal
26 signal.signal(signal.SIGINT, signal.SIG_DFL)
27
28 import sys
29 import os
30
31 from zope.interface import implementer
32
33 import twisted
34 from twisted.internet import reactor
35 import twisted.internet.endpoints
36 import twisted.logger
37 from twisted.logger import LogLevel
38 import twisted.python.constants
39 from twisted.python.constants import NamedConstant
40
41 import ipaddress
42 from ipaddress import AddressValueError
43
44 from optparse import OptionParser
45 import configparser
46 from configparser import ConfigParser
47 from configparser import NoOptionError
48
49 from functools import partial
50
51 import collections
52 import time
53 import codecs
54 import traceback
55
56 import re as regexp
57
58 import hippotatlib.slip as slip
59
60 class DBG(twisted.python.constants.Names):
61 INIT = NamedConstant()
62 CONFIG = NamedConstant()
63 ROUTE = NamedConstant()
64 DROP = NamedConstant()
65 OWNSOURCE = NamedConstant()
66 FLOW = NamedConstant()
67 HTTP = NamedConstant()
68 TWISTED = NamedConstant()
69 QUEUE = NamedConstant()
70 HTTP_CTRL = NamedConstant()
71 QUEUE_CTRL = NamedConstant()
72 HTTP_FULL = NamedConstant()
73 CTRL_DUMP = NamedConstant()
74 SLIP_FULL = NamedConstant()
75 DATA_COMPLETE = NamedConstant()
76
77 _hex_codec = codecs.getencoder('hex_codec')
78
79 #---------- logging ----------
80
81 org_stderr = sys.stderr
82
83 log = twisted.logger.Logger()
84
85 debug_set = set()
86 debug_def_detail = DBG.HTTP
87
88 def log_debug(dflag, msg, idof=None, d=None):
89 if dflag not in debug_set: return
90 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
91 if idof is not None:
92 msg = '[%#x] %s' % (id(idof), msg)
93 if d is not None:
94 trunc = ''
95 if not DBG.DATA_COMPLETE in debug_set:
96 if len(d) > 64:
97 d = d[0:64]
98 trunc = '...'
99 d = _hex_codec(d)[0].decode('ascii')
100 msg += ' ' + d + trunc
101 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
102
103 def logevent_is_boringtwisted(event):
104 try:
105 if event.get('log_level') != LogLevel.info:
106 return False
107 dflag = event.get('dflag')
108 if dflag is False : return False
109 if dflag in debug_set: return False
110 if dflag is None and DBG.TWISTED in debug_set: return False
111 return True
112 except Exception:
113 print('EXCEPTION (IN BORINGTWISTED CHECK)',
114 traceback.format_exc(), file=org_stderr)
115 return False
116
117 @implementer(twisted.logger.ILogFilterPredicate)
118 class LogNotBoringTwisted:
119 def __call__(self, event):
120 return (
121 twisted.logger.PredicateResult.no
122 if logevent_is_boringtwisted(event) else
123 twisted.logger.PredicateResult.yes
124 )
125
126 #---------- default config ----------
127
128 defcfg = '''
129 [COMMON]
130 max_batch_down = 65536
131 max_queue_time = 10
132 target_requests_outstanding = 3
133 http_timeout = 30
134 http_timeout_grace = 5
135 max_requests_outstanding = 6
136 max_batch_up = 4000
137 http_retry = 5
138 port = 80
139 vroutes = ''
140 ifname_client = hippo%%d
141 ifname_server = shippo%%d
142
143 #[server] or [<client>] overrides
144 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip,%(ifname)s %(rnets)s
145
146 # relating to virtual network
147 mtu = 1500
148
149 # addrs = 127.0.0.1 ::1
150 # url
151
152 # relating to virtual network
153 vvnetwork = 172.24.230.192
154 # vnetwork = <prefix>/<len>
155 # vaddr = <ipaddr>
156 # vrelay = <ipaddr>
157
158
159 # [<client-ip4-or-ipv6-address>]
160 # password = <password> # used by both, must match
161
162 [LIMIT]
163 max_batch_down = 262144
164 max_queue_time = 121
165 http_timeout = 121
166 target_requests_outstanding = 10
167 '''
168
169 # these need to be defined here so that they can be imported by import *
170 cfg = ConfigParser(strict=False)
171 optparser = OptionParser()
172
173 _mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
174 def mime_translate(s):
175 # SLIP-encoded packets cannot contain ESC ESC.
176 # Swap `-' and ESC. The result cannot contain `--'
177 return s.translate(_mimetrans)
178
179 class ConfigResults:
180 def __init__(self):
181 pass
182 def __repr__(self):
183 return 'ConfigResults('+repr(self.__dict__)+')'
184
185 def log_discard(packet, iface, saddr, daddr, why):
186 log_debug(DBG.DROP,
187 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
188 d=packet)
189
190 #---------- packet parsing ----------
191
192 def packet_addrs(packet):
193 version = packet[0] >> 4
194 if version == 4:
195 addrlen = 4
196 saddroff = 3*4
197 factory = ipaddress.IPv4Address
198 elif version == 6:
199 addrlen = 16
200 saddroff = 2*4
201 factory = ipaddress.IPv6Address
202 else:
203 raise ValueError('unsupported IP version %d' % version)
204 saddr = factory(packet[ saddroff : saddroff + addrlen ])
205 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
206 return (saddr, daddr)
207
208 #---------- address handling ----------
209
210 def ipaddr(input):
211 try:
212 r = ipaddress.IPv4Address(input)
213 except AddressValueError:
214 r = ipaddress.IPv6Address(input)
215 return r
216
217 def ipnetwork(input):
218 try:
219 r = ipaddress.IPv4Network(input)
220 except NetworkValueError:
221 r = ipaddress.IPv6Network(input)
222 return r
223
224 #---------- ipif (SLIP) subprocess ----------
225
226 class SlipStreamDecoder():
227 def __init__(self, desc, on_packet):
228 self._buffer = b''
229 self._on_packet = on_packet
230 self._desc = desc
231 self._log('__init__')
232
233 def _log(self, msg, **kwargs):
234 log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
235
236 def inputdata(self, data):
237 self._log('inputdata', d=data)
238 data = self._buffer + data
239 self._buffer = b''
240 packets = slip.decode(data, True)
241 self._buffer = packets.pop()
242 for packet in packets:
243 self._maybe_packet(packet)
244 self._log('bufremain', d=self._buffer)
245
246 def _maybe_packet(self, packet):
247 self._log('maybepacket', d=packet)
248 if len(packet):
249 self._on_packet(packet)
250
251 def flush(self):
252 self._log('flush')
253 data = self._buffer
254 self._buffer = b''
255 packets = slip.decode(data)
256 assert(len(packets) == 1)
257 self._maybe_packet(packets[0])
258
259 class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
260 def __init__(self, router):
261 self._router = router
262 self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet)
263 def connectionMade(self): pass
264 def outReceived(self, data):
265 self._decoder.inputdata(data)
266 def slip_on_packet(self, packet):
267 (saddr, daddr) = packet_addrs(packet)
268 if saddr.is_link_local or daddr.is_link_local:
269 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
270 return
271 self._router(packet, saddr, daddr)
272 def processEnded(self, status):
273 status.raiseException()
274
275 def start_ipif(command, router):
276 ipif = _IpifProcessProtocol(router)
277 reactor.spawnProcess(ipif,
278 '/bin/sh',['sh','-xc', command],
279 childFDs={0:'w', 1:'r', 2:2},
280 env=None)
281 return ipif
282
283 def queue_inbound(ipif, packet):
284 log_debug(DBG.FLOW, "queue_inbound", d=packet)
285 ipif.transport.write(slip.delimiter)
286 ipif.transport.write(slip.encode(packet))
287 ipif.transport.write(slip.delimiter)
288
289 #---------- packet queue ----------
290
291 class PacketQueue():
292 def __init__(self, desc, max_queue_time):
293 self._desc = desc
294 assert(desc + '')
295 self._max_queue_time = max_queue_time
296 self._pq = collections.deque() # packets
297
298 def _log(self, dflag, msg, **kwargs):
299 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
300
301 def append(self, packet):
302 self._log(DBG.QUEUE, 'append', d=packet)
303 self._pq.append((time.monotonic(), packet))
304
305 def nonempty(self):
306 self._log(DBG.QUEUE, 'nonempty ?')
307 while True:
308 try: (queuetime, packet) = self._pq[0]
309 except IndexError:
310 self._log(DBG.QUEUE, 'nonempty ? empty.')
311 return False
312
313 age = time.monotonic() - queuetime
314 if age > self._max_queue_time:
315 # strip old packets off the front
316 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
317 self._pq.popleft()
318 continue
319
320 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
321 return True
322
323 def process(self, sizequery, moredata, max_batch):
324 # sizequery() should return size of batch so far
325 # moredata(s) should add s to batch
326 self._log(DBG.QUEUE, 'process...')
327 while True:
328 try: (dummy, packet) = self._pq[0]
329 except IndexError:
330 self._log(DBG.QUEUE, 'process... empty')
331 break
332
333 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
334
335 encoded = slip.encode(packet)
336 sofar = sizequery()
337
338 self._log(DBG.QUEUE_CTRL,
339 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
340 d=encoded)
341
342 if sofar > 0:
343 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
344 self._log(DBG.QUEUE_CTRL, 'process... overflow')
345 break
346 moredata(slip.delimiter)
347
348 moredata(encoded)
349 self._pq.popleft()
350
351 #---------- error handling ----------
352
353 _crashing = False
354
355 def crash(err):
356 global _crashing
357 _crashing = True
358 print('========== CRASH ==========', err,
359 '===========================', file=sys.stderr)
360 try: reactor.stop()
361 except twisted.internet.error.ReactorNotRunning: pass
362
363 def crash_on_defer(defer):
364 defer.addErrback(lambda err: crash(err))
365
366 def crash_on_critical(event):
367 if event.get('log_level') >= LogLevel.critical:
368 crash(twisted.logger.formatEvent(event))
369
370 #---------- config processing ----------
371
372 def _cfg_process_putatives():
373 servers = { }
374 clients = { }
375 # maps from abstract object to canonical name for cs's
376
377 def putative(cmap, abstract, canoncs):
378 try:
379 current_canoncs = cmap[abstract]
380 except KeyError:
381 pass
382 else:
383 assert(current_canoncs == canoncs)
384 cmap[abstract] = canoncs
385
386 server_pat = r'[-.0-9A-Za-z]+'
387 client_pat = r'[.:0-9a-f]+'
388 server_re = regexp.compile(server_pat)
389 serverclient_re = regexp.compile(
390 server_pat + r' ' + '(?:' + client_pat + '|LIMIT)')
391
392 for cs in cfg.sections():
393 log_debug_config('putatives: section [%s]...' % (cs))
394
395 def log_ignore(why):
396 log_debug_config('putatives: section [%s] X ignore: %s' % (cs, why))
397 print('warning: ignoring config section [%s] (%s)' % (cs, why),
398 file=sys.stderr)
399
400 if cs == 'LIMIT' or cs == 'COMMON':
401 # plan A "[LIMIT]" or "[COMMON]"
402 log_debug_config('putatives: section [%s] A ignore' % (cs))
403 continue
404
405 try:
406 # plan B "[<client>]" part 1
407 ci = ipaddr(cs)
408 except AddressValueError:
409
410 if server_re.fullmatch(cs):
411 # plan C "[<servername>]"
412 log_debug_config('putatives: section [%s] C <server>' % (cs))
413 putative(servers, cs, cs)
414 continue
415
416 if serverclient_re.fullmatch(cs):
417 # plan D "[<servername> <client>]" part 1
418 (pss,pcs) = cs.split(' ')
419
420 if pcs == 'LIMIT':
421 # plan E "[<servername> LIMIT]"
422 log_debug_config('putatives: section [%s] E <server> LIMIT' % (cs))
423 continue
424
425 try:
426 # plan D "[<servername> <client>]" part 2
427 ci = ipaddr(pc)
428 except AddressValueError:
429 # plan F "[<some thing we do not understand>]"
430 log_ignore('bad-addr')
431 continue
432
433 else: # no AddressValueError
434 # plan D "[<servername> <client>]" part 3
435 log_debug_config('putatives: section [%s] D <server> <client>'
436 % (cs))
437 putative(clients, ci, pcs)
438 putative(servers, pss, pss)
439 continue
440
441 else: # no AddressValueError
442 # plan B "[<client>" part 2
443 log_debug_config('putatives: section [%s] B <client>' % (cs))
444 putative(clients, ci, cs)
445 continue
446
447 return (servers, clients)
448
449 def cfg_process_general(c, ss):
450 c.mtu = cfg1getint(ss, 'mtu')
451
452 def cfg_process_saddrs(c, ss):
453 class ServerAddr():
454 def __init__(self, port, addrspec):
455 self.port = port
456 # also self.addr
457 try:
458 self.addr = ipaddress.IPv4Address(addrspec)
459 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
460 self._inurl = b'%s'
461 except AddressValueError:
462 self.addr = ipaddress.IPv6Address(addrspec)
463 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
464 self._inurl = b'[%s]'
465 def make_endpoint(self):
466 return self._endpointfactory(reactor, self.port,
467 interface= '%s' % self.addr)
468 def url(self):
469 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
470 if self.port != 80: url += b':%d' % self.port
471 url += b'/'
472 return url
473 def __repr__(self):
474 return 'ServerAddr'+repr((self.port,self.addr))
475
476 c.port = cfg1getint(ss,'port')
477 c.saddrs = [ ]
478 for addrspec in cfg1get(ss, 'addrs').split():
479 sa = ServerAddr(c.port, addrspec)
480 c.saddrs.append(sa)
481
482 def cfg_process_vnetwork(c, ss):
483 c.vnetwork = ipnetwork(cfg1get(ss,'vnetwork'))
484 if c.vnetwork.num_addresses < 3 + 2:
485 raise ValueError('vnetwork needs at least 2^3 addresses')
486
487 def cfg_process_vaddr(c, ss):
488 try:
489 c.vaddr = cfg1get(ss,'vaddr')
490 except NoOptionError:
491 cfg_process_vnetwork(c, ss)
492 c.vaddr = next(c.vnetwork.hosts())
493
494 def cfg_search_section(key,sections):
495 for section in sections:
496 if cfg.has_option(section, key):
497 return section
498 raise NoOptionError(key, repr(sections))
499
500 def cfg_get_raw(*args, **kwargs):
501 # for passing to cfg_search
502 return cfg.get(*args, raw=True, **kwargs)
503
504 def cfg_search(getter,key,sections):
505 section = cfg_search_section(key,sections)
506 return getter(section, key)
507
508 def cfg1get(section,key, getter=cfg.get,**kwargs):
509 section = cfg_search_section(key,[section,'COMMON'])
510 return getter(section,key,**kwargs)
511
512 def cfg1getint(section,key, **kwargs):
513 return cfg1get(section,key, getter=cfg.getint,**kwargs);
514
515 def cfg_process_client_limited(cc,ss,sections,key):
516 val = cfg_search(cfg1getint, key, sections)
517 lim = cfg_search(cfg1getint, key, ['%s LIMIT' % ss, 'LIMIT'])
518 cc.__dict__[key] = min(val,lim)
519
520 def cfg_process_client_common(cc,ss,cs,ci):
521 # returns sections to search in, iff password is defined, otherwise None
522 cc.ci = ci
523
524 sections = ['%s %s' % (ss,cs),
525 cs,
526 ss,
527 'COMMON']
528
529 try: pwsection = cfg_search_section('password', sections)
530 except NoOptionError: return None
531
532 pw = cfg1get(pwsection, 'password')
533 cc.password = pw.encode('utf-8')
534
535 cfg_process_client_limited(cc,ss,sections,'target_requests_outstanding')
536 cfg_process_client_limited(cc,ss,sections,'http_timeout')
537
538 return sections
539
540 def cfg_process_ipif(c, sections, varmap):
541 for d, s in varmap:
542 try: v = getattr(c, s)
543 except AttributeError: continue
544 setattr(c, d, v)
545
546 #print('CFGIPIF',repr((varmap, sections, c.__dict__)),file=sys.stderr)
547
548 section = cfg_search_section('ipif', sections)
549 c.ipif_command = cfg1get(section,'ipif', vars=c.__dict__)
550
551 #---------- startup ----------
552
553 def log_debug_config(m):
554 if not DBG.CONFIG in debug_set: return
555 print('DBG.CONFIG:', m)
556
557 def common_startup(process_cfg):
558 # calls process_cfg(putative_clients, putative_servers)
559
560 # ConfigParser hates #-comments after values
561 trailingcomments_re = regexp.compile(r'#.*')
562 cfg.read_string(trailingcomments_re.sub('', defcfg))
563 need_defcfg = True
564
565 def readconfig(pathname, mandatory=True):
566 def log(m, p=pathname):
567 if not DBG.CONFIG in debug_set: return
568 log_debug_config('%s: %s' % (m, p))
569
570 try:
571 files = os.listdir(pathname)
572
573 except FileNotFoundError:
574 if mandatory: raise
575 log('skipped')
576 return
577
578 except NotADirectoryError:
579 cfg.read(pathname)
580 log('read file')
581 return
582
583 # is a directory
584 log('directory')
585 re = regexp.compile('[^-A-Za-z0-9_]')
586 for f in os.listdir(pathname):
587 if re.search(f): continue
588 subpath = pathname + '/' + f
589 try:
590 os.stat(subpath)
591 except FileNotFoundError:
592 log('entry skipped', subpath)
593 continue
594 cfg.read(subpath)
595 log('entry read', subpath)
596
597 def oc_config(od,os, value, op):
598 nonlocal need_defcfg
599 need_defcfg = False
600 readconfig(value)
601
602 def oc_extra_config(od,os, value, op):
603 readconfig(value)
604
605 def read_defconfig():
606 readconfig('/etc/hippotat/config.d', False)
607 readconfig('/etc/hippotat/passwords.d', False)
608 readconfig('/etc/hippotat/master.cfg', False)
609
610 def oc_defconfig(od,os, value, op):
611 nonlocal need_defcfg
612 need_defcfg = False
613 read_defconfig(value)
614
615 def dfs_less_detailed(dl):
616 return [df for df in DBG.iterconstants() if df <= dl]
617
618 def ds_default(od,os,dl,op):
619 global debug_set
620 debug_set.clear
621 debug_set |= set(dfs_less_detailed(debug_def_detail))
622
623 def ds_select(od,os, spec, op):
624 for it in spec.split(','):
625
626 if it.startswith('-'):
627 mutator = debug_set.discard
628 it = it[1:]
629 else:
630 mutator = debug_set.add
631
632 if it == '+':
633 dfs = DBG.iterconstants()
634
635 else:
636 if it.endswith('+'):
637 mapper = dfs_less_detailed
638 it = it[0:len(it)-1]
639 else:
640 mapper = lambda x: [x]
641
642 try:
643 dfspec = DBG.lookupByName(it)
644 except ValueError:
645 optparser.error('unknown debug flag %s in --debug-select' % it)
646
647 dfs = mapper(dfspec)
648
649 for df in dfs:
650 mutator(df)
651
652 optparser.add_option('-D', '--debug',
653 nargs=0,
654 action='callback',
655 help='enable default debug (to stdout)',
656 callback= ds_default)
657
658 optparser.add_option('--debug-select',
659 nargs=1,
660 type='string',
661 metavar='[-]DFLAG[+]|[-]+,...',
662 help=
663 '''enable (`-': disable) each specified DFLAG;
664 `+': do same for all "more interesting" DFLAGSs;
665 just `+': all DFLAGs.
666 DFLAGS: ''' + ' '.join([df.name for df in DBG.iterconstants()]),
667 action='callback',
668 callback= ds_select)
669
670 optparser.add_option('-c', '--config',
671 nargs=1,
672 type='string',
673 metavar='CONFIGFILE',
674 dest='configfile',
675 action='callback',
676 callback= oc_config)
677
678 optparser.add_option('--extra-config',
679 nargs=1,
680 type='string',
681 metavar='CONFIGFILE',
682 dest='configfile',
683 action='callback',
684 callback= oc_extra_config)
685
686 optparser.add_option('--default-config',
687 action='callback',
688 callback= oc_defconfig)
689
690 (opts, args) = optparser.parse_args()
691 if len(args): optparser.error('no non-option arguments please')
692
693 if need_defcfg:
694 read_defconfig()
695
696 try:
697 (pss, pcs) = _cfg_process_putatives()
698 process_cfg(opts, pss, pcs)
699 except (configparser.Error, ValueError):
700 traceback.print_exc(file=sys.stderr)
701 print('\nInvalid configuration, giving up.', file=sys.stderr)
702 sys.exit(12)
703
704
705 #print('X', debug_set, file=sys.stderr)
706
707 log_formatter = twisted.logger.formatEventAsClassicLogText
708 stdout_obs = twisted.logger.FileLogObserver(sys.stdout, log_formatter)
709 stderr_obs = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
710 pred = twisted.logger.LogLevelFilterPredicate(LogLevel.error)
711 stdsomething_obs = twisted.logger.FilteringLogObserver(
712 stderr_obs, [pred], stdout_obs
713 )
714 global file_log_observer
715 file_log_observer = twisted.logger.FilteringLogObserver(
716 stdsomething_obs, [LogNotBoringTwisted()]
717 )
718 #log_observer = stdsomething_obs
719 twisted.logger.globalLogBeginner.beginLoggingTo(
720 [ file_log_observer, crash_on_critical ]
721 )
722
723 def common_run():
724 log_debug(DBG.INIT, 'entering reactor')
725 if not _crashing: reactor.run()
726 print('ENDED', file=sys.stderr)
727 sys.exit(16)