3 # Hippotat - Asinine IP Over HTTP program
4 # hippotatlib/__init__.py - common library code
6 # Copyright 2017 Ian Jackson
10 # This program is free software: you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation, either version 3 of the License, or
13 # (at your option) any later version.
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 # GNU General Public License for more details.
20 # You should have received a copy of the GNU General Public License
21 # along with this program, in the file GPLv3. If not,
22 # see <http://www.gnu.org/licenses/>.
26 signal
.signal(signal
.SIGINT
, signal
.SIG_DFL
)
31 from zope
.interface
import implementer
34 from twisted
.internet
import reactor
35 import twisted
.internet
.endpoints
37 from twisted
.logger
import LogLevel
38 import twisted
.python
.constants
39 from twisted
.python
.constants
import NamedConstant
42 from ipaddress
import AddressValueError
44 from optparse
import OptionParser
46 from configparser
import ConfigParser
47 from configparser
import NoOptionError
49 from functools
import partial
58 import hippotatlib
.slip
as slip
60 class DBG(twisted
.python
.constants
.Names
):
61 INIT
= NamedConstant()
62 CONFIG
= NamedConstant()
63 ROUTE
= NamedConstant()
64 DROP
= NamedConstant()
65 OWNSOURCE
= NamedConstant()
66 FLOW
= NamedConstant()
67 HTTP
= NamedConstant()
68 TWISTED
= NamedConstant()
69 QUEUE
= NamedConstant()
70 HTTP_CTRL
= NamedConstant()
71 QUEUE_CTRL
= NamedConstant()
72 HTTP_FULL
= NamedConstant()
73 CTRL_DUMP
= NamedConstant()
74 SLIP_FULL
= NamedConstant()
75 DATA_COMPLETE
= NamedConstant()
77 _hex_codec
= codecs
.getencoder('hex_codec')
79 #---------- logging ----------
81 org_stderr
= sys
.stderr
83 log
= twisted
.logger
.Logger()
86 debug_def_detail
= DBG
.HTTP
88 def log_debug(dflag
, msg
, idof
=None, d
=None):
89 if dflag
not in debug_set
: return
90 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
92 msg
= '[%#x] %s' %
(id(idof
), msg
)
95 if not DBG
.DATA_COMPLETE
in debug_set
:
99 d
= _hex_codec(d
)[0].decode('ascii')
100 msg
+= ' ' + d
+ trunc
101 log
.info('{dflag} {msgcore}', dflag
=dflag
, msgcore
=msg
)
103 def logevent_is_boringtwisted(event
):
105 if event
.get('log_level') != LogLevel
.info
:
107 dflag
= event
.get('dflag')
108 if dflag
is False : return False
109 if dflag
in debug_set
: return False
110 if dflag
is None and DBG
.TWISTED
in debug_set
: return False
113 print('EXCEPTION (IN BORINGTWISTED CHECK)',
114 traceback
.format_exc(), file=org_stderr
)
117 @implementer(twisted
.logger
.ILogFilterPredicate
)
118 class LogNotBoringTwisted
:
119 def __call__(self
, event
):
121 twisted
.logger
.PredicateResult
.no
122 if logevent_is_boringtwisted(event
) else
123 twisted
.logger
.PredicateResult
.yes
126 #---------- default config ----------
130 max_batch_down = 65536
132 target_requests_outstanding = 3
134 http_timeout_grace = 5
135 max_requests_outstanding = 6
140 ifname_client = hippo%%d
141 ifname_server = shippo%%d
143 #[server] or [<client>] overrides
144 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip,%(ifname)s %(rnets)s
146 # relating to virtual network
149 # addrs = 127.0.0.1 ::1
152 # relating to virtual network
153 vvnetwork = 172.24.230.192
154 # vnetwork = <prefix>/<len>
159 # [<client-ip4-or-ipv6-address>]
160 # password = <password> # used by both, must match
163 max_batch_down = 262144
166 target_requests_outstanding = 10
169 # these need to be defined here so that they can be imported by import *
170 cfg
= ConfigParser(strict
=False)
171 optparser
= OptionParser()
173 _mimetrans
= bytes
.maketrans(b
'-'+slip
.esc
, slip
.esc
+b
'-')
174 def mime_translate(s
):
175 # SLIP-encoded packets cannot contain ESC ESC.
176 # Swap `-' and ESC. The result cannot contain `--'
177 return s
.translate(_mimetrans
)
183 return 'ConfigResults('+repr(self
.__dict__
)+')'
185 def log_discard(packet
, iface
, saddr
, daddr
, why
):
187 'discarded packet [%s] %s -> %s: %s' %
(iface
, saddr
, daddr
, why
),
190 #---------- packet parsing ----------
192 def packet_addrs(packet
):
193 version
= packet
[0] >> 4
197 factory
= ipaddress
.IPv4Address
201 factory
= ipaddress
.IPv6Address
203 raise ValueError('unsupported IP version %d' % version
)
204 saddr
= factory(packet
[ saddroff
: saddroff
+ addrlen
])
205 daddr
= factory(packet
[ saddroff
+ addrlen
: saddroff
+ addrlen
*2 ])
206 return (saddr
, daddr
)
208 #---------- address handling ----------
212 r
= ipaddress
.IPv4Address(input)
213 except AddressValueError
:
214 r
= ipaddress
.IPv6Address(input)
217 def ipnetwork(input):
219 r
= ipaddress
.IPv4Network(input)
220 except NetworkValueError
:
221 r
= ipaddress
.IPv6Network(input)
224 #---------- ipif (SLIP) subprocess ----------
226 class SlipStreamDecoder():
227 def __init__(self
, desc
, on_packet
):
229 self
._on_packet
= on_packet
231 self
._log('__init__')
233 def _log(self
, msg
, **kwargs
):
234 log_debug(DBG
.SLIP_FULL
, 'slip %s: %s' %
(self
._desc
, msg
), **kwargs
)
236 def inputdata(self
, data
):
237 self
._log('inputdata', d
=data
)
238 data
= self
._buffer
+ data
240 packets
= slip
.decode(data
, True)
241 self
._buffer
= packets
.pop()
242 for packet
in packets
:
243 self
._maybe_packet(packet
)
244 self
._log('bufremain', d
=self
._buffer
)
246 def _maybe_packet(self
, packet
):
247 self
._log('maybepacket', d
=packet
)
249 self
._on_packet(packet
)
255 packets
= slip
.decode(data
)
256 assert(len(packets
) == 1)
257 self
._maybe_packet(packets
[0])
259 class _IpifProcessProtocol(twisted
.internet
.protocol
.ProcessProtocol
):
260 def __init__(self
, router
):
261 self
._router
= router
262 self
._decoder
= SlipStreamDecoder('ipif', self
.slip_on_packet
)
263 def connectionMade(self
): pass
264 def outReceived(self
, data
):
265 self
._decoder
.inputdata(data
)
266 def slip_on_packet(self
, packet
):
267 (saddr
, daddr
) = packet_addrs(packet
)
268 if saddr
.is_link_local
or daddr
.is_link_local
:
269 log_discard(packet
, 'ipif', saddr
, daddr
, 'link-local')
271 self
._router(packet
, saddr
, daddr
)
272 def processEnded(self
, status
):
273 status
.raiseException()
275 def start_ipif(command
, router
):
276 ipif
= _IpifProcessProtocol(router
)
277 reactor
.spawnProcess(ipif
,
278 '/bin/sh',['sh','-xc', command
],
279 childFDs
={0:'w', 1:'r', 2:2},
283 def queue_inbound(ipif
, packet
):
284 log_debug(DBG
.FLOW
, "queue_inbound", d
=packet
)
285 ipif
.transport
.write(slip
.delimiter
)
286 ipif
.transport
.write(slip
.encode(packet
))
287 ipif
.transport
.write(slip
.delimiter
)
289 #---------- packet queue ----------
292 def __init__(self
, desc
, max_queue_time
):
295 self
._max_queue_time
= max_queue_time
296 self
._pq
= collections
.deque() # packets
298 def _log(self
, dflag
, msg
, **kwargs
):
299 log_debug(dflag
, self
._desc
+' pq: '+msg
, **kwargs
)
301 def append(self
, packet
):
302 self
._log(DBG
.QUEUE
, 'append', d
=packet
)
303 self
._pq
.append((time
.monotonic(), packet
))
306 self
._log(DBG
.QUEUE
, 'nonempty ?')
308 try: (queuetime
, packet
) = self
._pq
[0]
310 self
._log(DBG
.QUEUE
, 'nonempty ? empty.')
313 age
= time
.monotonic() - queuetime
314 if age
> self
._max_queue_time
:
315 # strip old packets off the front
316 self
._log(DBG
.QUEUE
, 'dropping (old)', d
=packet
)
320 self
._log(DBG
.QUEUE
, 'nonempty ? nonempty.')
323 def process(self
, sizequery
, moredata
, max_batch
):
324 # sizequery() should return size of batch so far
325 # moredata(s) should add s to batch
326 self
._log(DBG
.QUEUE
, 'process...')
328 try: (dummy
, packet
) = self
._pq
[0]
330 self
._log(DBG
.QUEUE
, 'process... empty')
333 self
._log(DBG
.QUEUE_CTRL
, 'process... packet', d
=packet
)
335 encoded
= slip
.encode(packet
)
338 self
._log(DBG
.QUEUE_CTRL
,
339 'process... (sofar=%d, max=%d) encoded' %
(sofar
, max_batch
),
343 if sofar
+ len(slip
.delimiter
) + len(encoded
) > max_batch
:
344 self
._log(DBG
.QUEUE_CTRL
, 'process... overflow')
346 moredata(slip
.delimiter
)
351 #---------- error handling ----------
358 print('========== CRASH ==========', err
,
359 '===========================', file=sys
.stderr
)
361 except twisted
.internet
.error
.ReactorNotRunning
: pass
363 def crash_on_defer(defer
):
364 defer
.addErrback(lambda err
: crash(err
))
366 def crash_on_critical(event
):
367 if event
.get('log_level') >= LogLevel
.critical
:
368 crash(twisted
.logger
.formatEvent(event
))
370 #---------- config processing ----------
372 def _cfg_process_putatives():
375 # maps from abstract object to canonical name for cs's
377 def putative(cmap
, abstract
, canoncs
):
379 current_canoncs
= cmap
[abstract
]
383 assert(current_canoncs
== canoncs
)
384 cmap
[abstract
] = canoncs
386 server_pat
= r
'[-.0-9A-Za-z]+'
387 client_pat
= r
'[.:0-9a-f]+'
388 server_re
= regexp
.compile(server_pat
)
389 serverclient_re
= regexp
.compile(
390 server_pat
+ r
' ' + '(?:' + client_pat
+ '|LIMIT)')
392 for cs
in cfg
.sections():
393 log_debug_config('putatives: section [%s]...' %
(cs
))
396 log_debug_config('putatives: section [%s] X ignore: %s' %
(cs
, why
))
397 print('warning: ignoring config section [%s] (%s)' %
(cs
, why
),
400 if cs
== 'LIMIT' or cs
== 'COMMON':
401 # plan A "[LIMIT]" or "[COMMON]"
402 log_debug_config('putatives: section [%s] A ignore' %
(cs
))
406 # plan B "[<client>]" part 1
408 except AddressValueError
:
410 if server_re
.fullmatch(cs
):
411 # plan C "[<servername>]"
412 log_debug_config('putatives: section [%s] C <server>' %
(cs
))
413 putative(servers
, cs
, cs
)
416 if serverclient_re
.fullmatch(cs
):
417 # plan D "[<servername> <client>]" part 1
418 (pss
,pcs
) = cs
.split(' ')
421 # plan E "[<servername> LIMIT]"
422 log_debug_config('putatives: section [%s] E <server> LIMIT' %
(cs
))
426 # plan D "[<servername> <client>]" part 2
428 except AddressValueError
:
429 # plan F "[<some thing we do not understand>]"
430 log_ignore('bad-addr')
433 else: # no AddressValueError
434 # plan D "[<servername> <client>]" part 3
435 log_debug_config('putatives: section [%s] D <server> <client>'
437 putative(clients
, ci
, pcs
)
438 putative(servers
, pss
, pss
)
441 else: # no AddressValueError
442 # plan B "[<client>" part 2
443 log_debug_config('putatives: section [%s] B <client>' %
(cs
))
444 putative(clients
, ci
, cs
)
447 return (servers
, clients
)
449 def cfg_process_general(c
, ss
):
450 c
.mtu
= cfg1getint(ss
, 'mtu')
452 def cfg_process_saddrs(c
, ss
):
454 def __init__(self
, port
, addrspec
):
458 self
.addr
= ipaddress
.IPv4Address(addrspec
)
459 self
._endpointfactory
= twisted
.internet
.endpoints
.TCP4ServerEndpoint
461 except AddressValueError
:
462 self
.addr
= ipaddress
.IPv6Address(addrspec
)
463 self
._endpointfactory
= twisted
.internet
.endpoints
.TCP6ServerEndpoint
464 self
._inurl
= b
'[%s]'
465 def make_endpoint(self
):
466 return self
._endpointfactory(reactor
, self
.port
,
467 interface
= '%s' % self
.addr
)
469 url
= b
'http://' + (self
._inurl % str
(self
.addr
).encode('ascii'))
470 if self
.port
!= 80: url
+= b
':%d' % self
.port
474 return 'ServerAddr'+repr((self
.port
,self
.addr
))
476 c
.port
= cfg1getint(ss
,'port')
478 for addrspec
in cfg1get(ss
, 'addrs').split():
479 sa
= ServerAddr(c
.port
, addrspec
)
482 def cfg_process_vnetwork(c
, ss
):
483 c
.vnetwork
= ipnetwork(cfg1get(ss
,'vnetwork'))
484 if c
.vnetwork
.num_addresses
< 3 + 2:
485 raise ValueError('vnetwork needs at least 2^3 addresses')
487 def cfg_process_vaddr(c
, ss
):
489 c
.vaddr
= cfg1get(ss
,'vaddr')
490 except NoOptionError
:
491 cfg_process_vnetwork(c
, ss
)
492 c
.vaddr
= next(c
.vnetwork
.hosts())
494 def cfg_search_section(key
,sections
):
495 for section
in sections
:
496 if cfg
.has_option(section
, key
):
498 raise NoOptionError(key
, repr(sections
))
500 def cfg_get_raw(*args
, **kwargs
):
501 # for passing to cfg_search
502 return cfg
.get(*args
, raw
=True, **kwargs
)
504 def cfg_search(getter
,key
,sections
):
505 section
= cfg_search_section(key
,sections
)
506 return getter(section
, key
)
508 def cfg1get(section
,key
, getter
=cfg
.get
,**kwargs
):
509 section
= cfg_search_section(key
,[section
,'COMMON'])
510 return getter(section
,key
,**kwargs
)
512 def cfg1getint(section
,key
, **kwargs
):
513 return cfg1get(section
,key
, getter
=cfg
.getint
,**kwargs
);
515 def cfg_process_client_limited(cc
,ss
,sections
,key
):
516 val
= cfg_search(cfg1getint
, key
, sections
)
517 lim
= cfg_search(cfg1getint
, key
, ['%s LIMIT' % ss
, 'LIMIT'])
518 cc
.__dict__
[key
] = min(val
,lim
)
520 def cfg_process_client_common(cc
,ss
,cs
,ci
):
521 # returns sections to search in, iff password is defined, otherwise None
524 sections
= ['%s %s' %
(ss
,cs
),
529 try: pwsection
= cfg_search_section('password', sections
)
530 except NoOptionError
: return None
532 pw
= cfg1get(pwsection
, 'password')
533 cc
.password
= pw
.encode('utf-8')
535 cfg_process_client_limited(cc
,ss
,sections
,'target_requests_outstanding')
536 cfg_process_client_limited(cc
,ss
,sections
,'http_timeout')
540 def cfg_process_ipif(c
, sections
, varmap
):
542 try: v
= getattr(c
, s
)
543 except AttributeError: continue
546 #print('CFGIPIF',repr((varmap, sections, c.__dict__)),file=sys.stderr)
548 section
= cfg_search_section('ipif', sections
)
549 c
.ipif_command
= cfg1get(section
,'ipif', vars=c
.__dict__
)
551 #---------- startup ----------
553 def log_debug_config(m
):
554 if not DBG
.CONFIG
in debug_set
: return
555 print('DBG.CONFIG:', m
)
557 def common_startup(process_cfg
):
558 # calls process_cfg(putative_clients, putative_servers)
560 # ConfigParser hates #-comments after values
561 trailingcomments_re
= regexp
.compile(r
'#.*')
562 cfg
.read_string(trailingcomments_re
.sub('', defcfg
))
565 def readconfig(pathname
, mandatory
=True):
566 def log(m
, p
=pathname
):
567 if not DBG
.CONFIG
in debug_set
: return
568 log_debug_config('%s: %s' %
(m
, p
))
571 files
= os
.listdir(pathname
)
573 except FileNotFoundError
:
578 except NotADirectoryError
:
585 re
= regexp
.compile('[^-A-Za-z0-9_]')
586 for f
in os
.listdir(pathname
):
587 if re
.search(f
): continue
588 subpath
= pathname
+ '/' + f
591 except FileNotFoundError
:
592 log('entry skipped', subpath
)
595 log('entry read', subpath
)
597 def oc_config(od
,os
, value
, op
):
602 def oc_extra_config(od
,os
, value
, op
):
605 def read_defconfig():
606 readconfig('/etc/hippotat/config.d', False)
607 readconfig('/etc/hippotat/passwords.d', False)
608 readconfig('/etc/hippotat/master.cfg', False)
610 def oc_defconfig(od
,os
, value
, op
):
613 read_defconfig(value
)
615 def dfs_less_detailed(dl
):
616 return [df
for df
in DBG
.iterconstants() if df
<= dl
]
618 def ds_default(od
,os
,dl
,op
):
621 debug_set |
= set(dfs_less_detailed(debug_def_detail
))
623 def ds_select(od
,os
, spec
, op
):
624 for it
in spec
.split(','):
626 if it
.startswith('-'):
627 mutator
= debug_set
.discard
630 mutator
= debug_set
.add
633 dfs
= DBG
.iterconstants()
637 mapper
= dfs_less_detailed
640 mapper
= lambda x
: [x
]
643 dfspec
= DBG
.lookupByName(it
)
645 optparser
.error('unknown debug flag %s in --debug-select' % it
)
652 optparser
.add_option('-D', '--debug',
655 help='enable default debug (to stdout)',
656 callback
= ds_default
)
658 optparser
.add_option('--debug-select',
661 metavar
='[-]DFLAG[+]|[-]+,...',
663 '''enable (`-': disable) each specified DFLAG;
664 `+': do same for all "more interesting" DFLAGSs;
665 just `+': all DFLAGs.
666 DFLAGS: ''' + ' '.join([df
.name
for df
in DBG
.iterconstants()]),
670 optparser
.add_option('-c', '--config',
673 metavar
='CONFIGFILE',
678 optparser
.add_option('--extra-config',
681 metavar
='CONFIGFILE',
684 callback
= oc_extra_config
)
686 optparser
.add_option('--default-config',
688 callback
= oc_defconfig
)
690 (opts
, args
) = optparser
.parse_args()
691 if len(args
): optparser
.error('no non-option arguments please')
697 (pss
, pcs
) = _cfg_process_putatives()
698 process_cfg(opts
, pss
, pcs
)
699 except (configparser
.Error
, ValueError):
700 traceback
.print_exc(file=sys
.stderr
)
701 print('\nInvalid configuration, giving up.', file=sys
.stderr
)
705 #print('X', debug_set, file=sys.stderr)
707 log_formatter
= twisted
.logger
.formatEventAsClassicLogText
708 stdout_obs
= twisted
.logger
.FileLogObserver(sys
.stdout
, log_formatter
)
709 stderr_obs
= twisted
.logger
.FileLogObserver(sys
.stderr
, log_formatter
)
710 pred
= twisted
.logger
.LogLevelFilterPredicate(LogLevel
.error
)
711 stdsomething_obs
= twisted
.logger
.FilteringLogObserver(
712 stderr_obs
, [pred
], stdout_obs
714 global file_log_observer
715 file_log_observer
= twisted
.logger
.FilteringLogObserver(
716 stdsomething_obs
, [LogNotBoringTwisted()]
718 #log_observer = stdsomething_obs
719 twisted
.logger
.globalLogBeginner
.beginLoggingTo(
720 [ file_log_observer
, crash_on_critical
]
724 log_debug(DBG
.INIT
, 'entering reactor')
725 if not _crashing
: reactor
.run()
726 print('ENDED', file=sys
.stderr
)