try: from gi.repository import GLib as G
except ImportError: import gobject as G
from struct import pack, unpack
+from cStringIO import StringIO
SM = T.svcmgr
##__import__('rmcr').__debug = True
def __init__(me, **kw):
me.__dict__.update(kw)
+def loadb(s):
+ n = 0
+ for ch in s: n = 256*n + ord(ch)
+ return n
+
+def storeb(n, wd = None):
+ if wd is None: wd = n.bit_length()
+ s = StringIO()
+ for i in xrange((wd - 1)&-8, -8, -8): s.write(chr((n >> i)&0xff))
+ return s.getvalue()
+
###--------------------------------------------------------------------------
### Address manipulation.
+###
+### I think this is the most demanding application, in terms of address
+### hacking, in the entire TrIPE suite. At least we don't have to do it in
+### C.
-class InetAddress (object):
- AF = S.AF_INET
- AFNAME = 'IPv4'
+class BaseAddress (object):
def __init__(me, addrstr, maskstr = None):
- me.addr = me._addrstr_to_int(addrstr)
+ me._setaddr(addrstr)
if maskstr is None:
me.mask = -1
elif maskstr.isdigit():
- me.mask = (1 << 32) - (1 << 32 - int(maskstr))
+ me.mask = (1 << me.NBITS) - (1 << me.NBITS - int(maskstr))
else:
- me.mask = me._addrstr_to_int(maskstr)
+ me._setmask(maskstr)
if me.addr&~me.mask:
raise ValueError('network contains bits set beyond mask')
def _addrstr_to_int(me, addrstr):
- return unpack('>L', S.inet_aton(addrstr))[0]
+ try: return loadb(S.inet_pton(me.AF, addrstr))
+ except S.error: raise ValueError('bad address syntax')
def _int_to_addrstr(me, n):
- return S.inet_ntoa(pack('>L', n))
+ return S.inet_ntop(me.AF, storeb(me.addr, me.NBITS))
+ def _setmask(me, maskstr):
+ raise ValueError('only prefix masked supported')
+ def _maskstr(me):
+ raise ValueError('only prefix masked supported')
def sockaddr(me, port = 0):
if me.mask != -1: raise ValueError('not a simple address')
- return me._int_to_addrstr(me.addr), port
+ return me._sockaddr(port)
def __str__(me):
- addrstr = me._int_to_addrstr(me.addr)
+ addrstr = me._addrstr()
if me.mask == -1:
return addrstr
else:
- inv = me.mask ^ ((1 << 32) - 1)
+ inv = me.mask ^ ((1 << me.NBITS) - 1)
if (inv&(inv + 1)) == 0:
- return '%s/%d' % (addrstr, 32 - inv.bit_length())
+ return '%s/%d' % (addrstr, me.NBITS - inv.bit_length())
else:
- return '%s/%s' % (addrstr, me._int_to_addrstr(me.mask))
+ return '%s/%s' % (addrstr, me._maskstr())
def withinp(me, net):
+ if type(net) != type(me): return False
if (me.mask&net.mask) != net.mask: return False
if (me.addr ^ net.addr)&net.mask: return False
- return True
+ return me._withinp(net)
def eq(me, other):
+ if type(me) != type(other): return False
if me.mask != other.mask: return False
if me.addr != other.addr: return False
+ return me._eq(other)
+ def _withinp(me, net):
+ return True
+ def _eq(me, other):
return True
+
+class InetAddress (BaseAddress):
+ AF = S.AF_INET
+ AFNAME = 'IPv4'
+ NBITS = 32
+ def _addrstr_to_int(me, addrstr):
+ try: return loadb(S.inet_aton(addrstr))
+ except S.error: raise ValueError('bad address syntax')
+ def _setaddr(me, addrstr):
+ me.addr = me._addrstr_to_int(addrstr)
+ def _setmask(me, maskstr):
+ me.mask = me._addrstr_to_int(maskstr)
+ def _addrstr(me):
+ return me._int_to_addrstr(me.addr)
+ def _maskstr(me):
+ return me._int_to_addrstr(me.mask)
+ def _sockaddr(me, port = 0):
+ return (me._addrstr(), port)
@classmethod
def from_sockaddr(cls, sa):
addr, port = (lambda a, p: (a, p))(*sa)
return cls(addr), port
+class Inet6Address (BaseAddress):
+ AF = S.AF_INET6
+ AFNAME = 'IPv6'
+ NBITS = 128
+ def _setaddr(me, addrstr):
+ pc = addrstr.find('%')
+ if pc == -1:
+ me.addr = me._addrstr_to_int(addrstr)
+ me.scope = 0
+ else:
+ me.addr = me._addrstr_to_int(addrstr[:pc])
+ ais = S.getaddrinfo(addrstr, 0, S.AF_INET6, S.SOCK_DGRAM, 0,
+ S.AI_NUMERICHOST | S.AI_NUMERICSERV)
+ me.scope = ais[0][4][3]
+ def _addrstr(me):
+ addrstr = me._int_to_addrstr(me.addr)
+ if me.scope == 0:
+ return addrstr
+ else:
+ name, _ = S.getnameinfo((addrstr, 0, 0, me.scope),
+ S.NI_NUMERICHOST | S.NI_NUMERICSERV)
+ return name
+ def _sockaddr(me, port = 0):
+ return (me._addrstr(), port, 0, me.scope)
+ @classmethod
+ def from_sockaddr(cls, sa):
+ addr, port, _, scope = (lambda a, p, f = 0, s = 0: (a, p, f, s))(*sa)
+ me = cls(addr)
+ me.scope = scope
+ return me, port
+ def _withinp(me, net):
+ return net.scope == 0 or me.scope == net.scope
+ def _eq(me, other):
+ return me.scope == other.scope
+
def parse_address(addrstr, maskstr = None):
- return InetAddress(addrstr, maskstr)
+ if addrstr.find(':') >= 0: return Inet6Address(addrstr, maskstr)
+ else: return InetAddress(addrstr, maskstr)
def parse_net(netstr):
try: sl = netstr.index('/')
## this service are largely going to be satellite notes, I don't think
## scalability's going to be a problem.
-TESTADDRS = [InetAddress('1.2.3.4')]
+TESTADDRS = [InetAddress('1.2.3.4'), Inet6Address('2001::1')]
CONFSYNTAX = [
('COMMENT', RX.compile(r'^\s*($|[;#])')),