3 # Hippotat - Asinine IP Over HTTP program
4 # hippotatlib/__init__.py - common library code
6 # Copyright 2017 Ian Jackson
10 # This program is free software: you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation, either version 3 of the License, or
13 # (at your option) any later version.
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 # GNU General Public License for more details.
20 # You should have received a copy of the GNU General Public License
21 # along with this program, in the file GPLv3. If not,
22 # see <http://www.gnu.org/licenses/>.
26 signal
.signal(signal
.SIGINT
, signal
.SIG_DFL
)
31 from zope
.interface
import implementer
34 from twisted
.internet
import reactor
35 import twisted
.internet
.endpoints
37 from twisted
.logger
import LogLevel
38 import twisted
.python
.constants
39 from twisted
.python
.constants
import NamedConstant
42 from ipaddress
import AddressValueError
44 from optparse
import OptionParser
46 from configparser
import ConfigParser
47 from configparser
import NoOptionError
49 from functools
import partial
58 import hippotatlib
.slip
as slip
60 class DBG(twisted
.python
.constants
.Names
):
61 INIT
= NamedConstant()
62 CONFIG
= NamedConstant()
63 ROUTE
= NamedConstant()
64 DROP
= NamedConstant()
65 OWNSOURCE
= NamedConstant()
66 FLOW
= NamedConstant()
67 HTTP
= NamedConstant()
68 TWISTED
= NamedConstant()
69 QUEUE
= NamedConstant()
70 HTTP_CTRL
= NamedConstant()
71 QUEUE_CTRL
= NamedConstant()
72 HTTP_FULL
= NamedConstant()
73 CTRL_DUMP
= NamedConstant()
74 SLIP_FULL
= NamedConstant()
75 DATA_COMPLETE
= NamedConstant()
77 _hex_codec
= codecs
.getencoder('hex_codec')
79 #---------- logging ----------
81 org_stderr
= sys
.stderr
83 log
= twisted
.logger
.Logger()
86 debug_def_detail
= DBG
.HTTP
88 def log_debug(dflag
, msg
, idof
=None, d
=None):
89 if dflag
not in debug_set
: return
90 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
92 msg
= '[%#x] %s' %
(id(idof
), msg
)
95 if not DBG
.DATA_COMPLETE
in debug_set
:
99 d
= _hex_codec(d
)[0].decode('ascii')
100 msg
+= ' ' + d
+ trunc
101 log
.info('{dflag} {msgcore}', dflag
=dflag
, msgcore
=msg
)
103 def logevent_is_boringtwisted(event
):
105 if event
.get('log_level') != LogLevel
.info
:
107 dflag
= event
.get('dflag')
108 if dflag
is False : return False
109 if dflag
in debug_set
: return False
110 if dflag
is None and DBG
.TWISTED
in debug_set
: return False
113 print(traceback
.format_exc(), file=org_stderr
)
116 @implementer(twisted
.logger
.ILogFilterPredicate
)
117 class LogNotBoringTwisted
:
118 def __call__(self
, event
):
120 twisted
.logger
.PredicateResult
.no
121 if logevent_is_boringtwisted(event
) else
122 twisted
.logger
.PredicateResult
.yes
125 #---------- default config ----------
129 max_batch_down = 65536
131 target_requests_outstanding = 3
133 http_timeout_grace = 5
134 max_requests_outstanding = 6
140 #[server] or [<client>] overrides
141 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
143 # relating to virtual network
146 # addrs = 127.0.0.1 ::1
149 # relating to virtual network
150 vvnetwork = 172.24.230.192
151 # vnetwork = <prefix>/<len>
156 # [<client-ip4-or-ipv6-address>]
157 # password = <password> # used by both, must match
160 max_batch_down = 262144
163 target_requests_outstanding = 10
166 # these need to be defined here so that they can be imported by import *
167 cfg
= ConfigParser(strict
=False)
168 optparser
= OptionParser()
170 _mimetrans
= bytes
.maketrans(b
'-'+slip
.esc
, slip
.esc
+b
'-')
171 def mime_translate(s
):
172 # SLIP-encoded packets cannot contain ESC ESC.
173 # Swap `-' and ESC. The result cannot contain `--'
174 return s
.translate(_mimetrans
)
180 return 'ConfigResults('+repr(self
.__dict__
)+')'
182 def log_discard(packet
, iface
, saddr
, daddr
, why
):
184 'discarded packet [%s] %s -> %s: %s' %
(iface
, saddr
, daddr
, why
),
187 #---------- packet parsing ----------
189 def packet_addrs(packet
):
190 version
= packet
[0] >> 4
194 factory
= ipaddress
.IPv4Address
198 factory
= ipaddress
.IPv6Address
200 raise ValueError('unsupported IP version %d' % version
)
201 saddr
= factory(packet
[ saddroff
: saddroff
+ addrlen
])
202 daddr
= factory(packet
[ saddroff
+ addrlen
: saddroff
+ addrlen
*2 ])
203 return (saddr
, daddr
)
205 #---------- address handling ----------
209 r
= ipaddress
.IPv4Address(input)
210 except AddressValueError
:
211 r
= ipaddress
.IPv6Address(input)
214 def ipnetwork(input):
216 r
= ipaddress
.IPv4Network(input)
217 except NetworkValueError
:
218 r
= ipaddress
.IPv6Network(input)
221 #---------- ipif (SLIP) subprocess ----------
223 class SlipStreamDecoder():
224 def __init__(self
, desc
, on_packet
):
226 self
._on_packet
= on_packet
228 self
._log('__init__')
230 def _log(self
, msg
, **kwargs
):
231 log_debug(DBG
.SLIP_FULL
, 'slip %s: %s' %
(self
._desc
, msg
), **kwargs
)
233 def inputdata(self
, data
):
234 self
._log('inputdata', d
=data
)
235 data
= self
._buffer
+ data
237 packets
= slip
.decode(data
, True)
238 self
._buffer
= packets
.pop()
239 for packet
in packets
:
240 self
._maybe_packet(packet
)
241 self
._log('bufremain', d
=self
._buffer
)
243 def _maybe_packet(self
, packet
):
244 self
._log('maybepacket', d
=packet
)
246 self
._on_packet(packet
)
252 packets
= slip
.decode(data
)
253 assert(len(packets
) == 1)
254 self
._maybe_packet(packets
[0])
256 class _IpifProcessProtocol(twisted
.internet
.protocol
.ProcessProtocol
):
257 def __init__(self
, router
):
258 self
._router
= router
259 self
._decoder
= SlipStreamDecoder('ipif', self
.slip_on_packet
)
260 def connectionMade(self
): pass
261 def outReceived(self
, data
):
262 self
._decoder
.inputdata(data
)
263 def slip_on_packet(self
, packet
):
264 (saddr
, daddr
) = packet_addrs(packet
)
265 if saddr
.is_link_local
or daddr
.is_link_local
:
266 log_discard(packet
, 'ipif', saddr
, daddr
, 'link-local')
268 self
._router(packet
, saddr
, daddr
)
269 def processEnded(self
, status
):
270 status
.raiseException()
272 def start_ipif(command
, router
):
273 ipif
= _IpifProcessProtocol(router
)
274 reactor
.spawnProcess(ipif
,
275 '/bin/sh',['sh','-xc', command
],
276 childFDs
={0:'w', 1:'r', 2:2},
280 def queue_inbound(ipif
, packet
):
281 log_debug(DBG
.FLOW
, "queue_inbound", d
=packet
)
282 ipif
.transport
.write(slip
.delimiter
)
283 ipif
.transport
.write(slip
.encode(packet
))
284 ipif
.transport
.write(slip
.delimiter
)
286 #---------- packet queue ----------
289 def __init__(self
, desc
, max_queue_time
):
292 self
._max_queue_time
= max_queue_time
293 self
._pq
= collections
.deque() # packets
295 def _log(self
, dflag
, msg
, **kwargs
):
296 log_debug(dflag
, self
._desc
+' pq: '+msg
, **kwargs
)
298 def append(self
, packet
):
299 self
._log(DBG
.QUEUE
, 'append', d
=packet
)
300 self
._pq
.append((time
.monotonic(), packet
))
303 self
._log(DBG
.QUEUE
, 'nonempty ?')
305 try: (queuetime
, packet
) = self
._pq
[0]
307 self
._log(DBG
.QUEUE
, 'nonempty ? empty.')
310 age
= time
.monotonic() - queuetime
311 if age
> self
._max_queue_time
:
312 # strip old packets off the front
313 self
._log(DBG
.QUEUE
, 'dropping (old)', d
=packet
)
317 self
._log(DBG
.QUEUE
, 'nonempty ? nonempty.')
320 def process(self
, sizequery
, moredata
, max_batch
):
321 # sizequery() should return size of batch so far
322 # moredata(s) should add s to batch
323 self
._log(DBG
.QUEUE
, 'process...')
325 try: (dummy
, packet
) = self
._pq
[0]
327 self
._log(DBG
.QUEUE
, 'process... empty')
330 self
._log(DBG
.QUEUE_CTRL
, 'process... packet', d
=packet
)
332 encoded
= slip
.encode(packet
)
335 self
._log(DBG
.QUEUE_CTRL
,
336 'process... (sofar=%d, max=%d) encoded' %
(sofar
, max_batch
),
340 if sofar
+ len(slip
.delimiter
) + len(encoded
) > max_batch
:
341 self
._log(DBG
.QUEUE_CTRL
, 'process... overflow')
343 moredata(slip
.delimiter
)
348 #---------- error handling ----------
355 print('========== CRASH ==========', err
,
356 '===========================', file=sys
.stderr
)
358 except twisted
.internet
.error
.ReactorNotRunning
: pass
360 def crash_on_defer(defer
):
361 defer
.addErrback(lambda err
: crash(err
))
363 def crash_on_critical(event
):
364 if event
.get('log_level') >= LogLevel
.critical
:
365 crash(twisted
.logger
.formatEvent(event
))
367 #---------- config processing ----------
369 def _cfg_process_putatives():
372 # maps from abstract object to canonical name for cs's
374 def putative(cmap
, abstract
, canoncs
):
376 current_canoncs
= cmap
[abstract
]
380 assert(current_canoncs
== canoncs
)
381 cmap
[abstract
] = canoncs
383 server_pat
= r
'[-.0-9A-Za-z]+'
384 client_pat
= r
'[.:0-9a-f]+'
385 server_re
= regexp
.compile(server_pat
)
386 serverclient_re
= regexp
.compile(server_pat
+ r
' ' + client_pat
)
388 for cs
in cfg
.sections():
394 # plan B "[<client>]" part 1
396 except AddressValueError
:
398 if server_re
.fullmatch(cs
):
399 # plan C "[<servername>]"
400 putative(servers
, cs
, cs
)
403 if serverclient_re
.fullmatch(cs
):
404 # plan D "[<servername> <client>]" part 1
405 (pss
,pcs
) = cs
.split(' ')
408 # plan E "[<servername> LIMIT]"
412 # plan D "[<servername> <client>]" part 2
414 except AddressValueError
:
415 # plan F "[<some thing we do not understand>]"
416 # well, we ignore this
417 print('warning: ignoring config section %s' % cs
, file=sys
.stderr
)
420 else: # no AddressValueError
421 # plan D "[<servername> <client]" part 3
422 putative(clients
, ci
, pcs
)
423 putative(servers
, pss
, pss
)
426 else: # no AddressValueError
427 # plan B "[<client>" part 2
428 putative(clients
, ci
, cs
)
431 return (servers
, clients
)
433 def cfg_process_common(c
, ss
):
434 c
.mtu
= cfg
.getint(ss
, 'mtu')
436 def cfg_process_saddrs(c
, ss
):
438 def __init__(self
, port
, addrspec
):
442 self
.addr
= ipaddress
.IPv4Address(addrspec
)
443 self
._endpointfactory
= twisted
.internet
.endpoints
.TCP4ServerEndpoint
445 except AddressValueError
:
446 self
.addr
= ipaddress
.IPv6Address(addrspec
)
447 self
._endpointfactory
= twisted
.internet
.endpoints
.TCP6ServerEndpoint
448 self
._inurl
= b
'[%s]'
449 def make_endpoint(self
):
450 return self
._endpointfactory(reactor
, self
.port
,
451 interface
= '%s' % self
.addr
)
453 url
= b
'http://' + (self
._inurl % str
(self
.addr
).encode('ascii'))
454 if self
.port
!= 80: url
+= b
':%d' % self
.port
458 return 'ServerAddr'+repr((self
.port
,self
.addr
))
460 c
.port
= cfg
.getint(ss
,'port')
462 for addrspec
in cfg
.get(ss
, 'addrs').split():
463 sa
= ServerAddr(c
.port
, addrspec
)
466 def cfg_process_vnetwork(c
, ss
):
467 c
.vnetwork
= ipnetwork(cfg
.get(ss
,'vnetwork'))
468 if c
.vnetwork
.num_addresses
< 3 + 2:
469 raise ValueError('vnetwork needs at least 2^3 addresses')
471 def cfg_process_vaddr(c
, ss
):
473 c
.vaddr
= cfg
.get(ss
,'vaddr')
474 except NoOptionError
:
475 cfg_process_vnetwork(c
, ss
)
476 c
.vaddr
= next(c
.vnetwork
.hosts())
478 def cfg_search_section(key
,sections
):
479 for section
in sections
:
480 if cfg
.has_option(section
, key
):
482 raise NoOptionError(key
, repr(sections
))
484 def cfg_search(getter
,key
,sections
):
485 section
= cfg_search_section(key
,sections
)
486 return getter(section
, key
)
488 def cfg_process_client_limited(cc
,ss
,sections
,key
):
489 val
= cfg_search(cfg
.getint
, key
, sections
)
490 lim
= cfg_search(cfg
.getint
, key
, ['%s LIMIT' % ss
, 'LIMIT'])
491 cc
.__dict__
[key
] = min(val
,lim
)
493 def cfg_process_client_common(cc
,ss
,cs
,ci
):
494 # returns sections to search in, iff password is defined, otherwise None
497 sections
= ['%s %s' %
(ss
,cs
),
502 try: pwsection
= cfg_search_section('password', sections
)
503 except NoOptionError
: return None
505 pw
= cfg
.get(pwsection
, 'password')
506 cc
.password
= pw
.encode('utf-8')
508 cfg_process_client_limited(cc
,ss
,sections
,'target_requests_outstanding')
509 cfg_process_client_limited(cc
,ss
,sections
,'http_timeout')
513 def cfg_process_ipif(c
, sections
, varmap
):
515 try: v
= getattr(c
, s
)
516 except AttributeError: continue
519 #print('CFGIPIF',repr((varmap, sections, c.__dict__)),file=sys.stderr)
521 section
= cfg_search_section('ipif', sections
)
522 c
.ipif_command
= cfg
.get(section
,'ipif', vars=c
.__dict__
)
524 #---------- startup ----------
526 def log_debug_config(m
):
527 if not DBG
.CONFIG
in debug_set
: return
528 print('DBG.CONFIG:', m
)
530 def common_startup(process_cfg
):
531 # calls process_cfg(putative_clients, putative_servers)
533 # ConfigParser hates #-comments after values
534 trailingcomments_re
= regexp
.compile(r
'#.*')
535 cfg
.read_string(trailingcomments_re
.sub('', defcfg
))
538 def readconfig(pathname
, mandatory
=True):
539 def log(m
, p
=pathname
):
540 if not DBG
.CONFIG
in debug_set
: return
541 log_debug_config('%s: %s' %
(m
, pathname
))
544 files
= os
.listdir(pathname
)
546 except FileNotFoundError
:
551 except NotADirectoryError
:
558 re
= regexp
.compile('[^-A-Za-z0-9_]')
559 for f
in os
.listdir(pathname
):
560 if re
.search(f
): continue
561 subpath
= pathname
+ '/' + f
564 except FileNotFoundError
:
565 log('entry skipped', subpath
)
568 log('entry read', subpath
)
570 def oc_config(od
,os
, value
, op
):
575 def oc_extra_config(od
,os
, value
, op
):
578 def read_defconfig():
579 readconfig('/etc/hippotat/config.d', False)
580 readconfig('/etc/hippotat/passwords.d', False)
581 readconfig('/etc/hippotat/master.cfg', False)
583 def oc_defconfig(od
,os
, value
, op
):
586 read_defconfig(value
)
588 def dfs_less_detailed(dl
):
589 return [df
for df
in DBG
.iterconstants() if df
<= dl
]
591 def ds_default(od
,os
,dl
,op
):
594 debug_set |
= set(dfs_less_detailed(debug_def_detail
))
596 def ds_select(od
,os
, spec
, op
):
597 for it
in spec
.split(','):
599 if it
.startswith('-'):
600 mutator
= debug_set
.discard
603 mutator
= debug_set
.add
606 dfs
= DBG
.iterconstants()
610 mapper
= dfs_less_detailed
613 mapper
= lambda x
: [x
]
616 dfspec
= DBG
.lookupByName(it
)
618 optparser
.error('unknown debug flag %s in --debug-select' % it
)
625 optparser
.add_option('-D', '--debug',
628 help='enable default debug (to stdout)',
629 callback
= ds_default
)
631 optparser
.add_option('--debug-select',
634 metavar
='[-]DFLAG[+]|[-]+,...',
636 '''enable (`-': disable) each specified DFLAG;
637 `+': do same for all "more interesting" DFLAGSs;
638 just `+': all DFLAGs.
639 DFLAGS: ''' + ' '.join([df
.name
for df
in DBG
.iterconstants()]),
643 optparser
.add_option('-c', '--config',
646 metavar
='CONFIGFILE',
651 optparser
.add_option('--extra-config',
654 metavar
='CONFIGFILE',
657 callback
= oc_extra_config
)
659 optparser
.add_option('--default-config',
661 callback
= oc_defconfig
)
663 (opts
, args
) = optparser
.parse_args()
664 if len(args
): optparser
.error('no non-option arguments please')
670 (pss
, pcs
) = _cfg_process_putatives()
671 process_cfg(opts
, pss
, pcs
)
672 except (configparser
.Error
, ValueError):
673 traceback
.print_exc(file=sys
.stderr
)
674 print('\nInvalid configuration, giving up.', file=sys
.stderr
)
678 #print('X', debug_set, file=sys.stderr)
680 log_formatter
= twisted
.logger
.formatEventAsClassicLogText
681 stdout_obs
= twisted
.logger
.FileLogObserver(sys
.stdout
, log_formatter
)
682 stderr_obs
= twisted
.logger
.FileLogObserver(sys
.stderr
, log_formatter
)
683 pred
= twisted
.logger
.LogLevelFilterPredicate(LogLevel
.error
)
684 stdsomething_obs
= twisted
.logger
.FilteringLogObserver(
685 stderr_obs
, [pred
], stdout_obs
687 global file_log_observer
688 file_log_observer
= twisted
.logger
.FilteringLogObserver(
689 stdsomething_obs
, [LogNotBoringTwisted()]
691 #log_observer = stdsomething_obs
692 twisted
.logger
.globalLogBeginner
.beginLoggingTo(
693 [ file_log_observer
, crash_on_critical
]
697 log_debug(DBG
.INIT
, 'entering reactor')
698 if not _crashing
: reactor
.run()
699 print('ENDED', file=sys
.stderr
)