X-Git-Url: https://git.distorted.org.uk/~mdw/mLib-python/blobdiff_plain/20bce5e92b01cd928f26b61be78215117039c561..579d01693c86259110fe7a2c2a6f005f1bdbad5b:/ident.pyx diff --git a/ident.pyx b/ident.pyx new file mode 100644 index 0000000..d19c3a4 --- /dev/null +++ b/ident.pyx @@ -0,0 +1,171 @@ +# -*-pyrex-*- +# +# $Id$ +# +# Ident client +# +# (c) 2005 Straylight/Edgeware +# + +#----- Licensing notice ----------------------------------------------------- +# +# This file is part of the Python interface to mLib. +# +# mLib/Python is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# mLib/Python is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with mLib/Python; if not, write to the Free Software Foundation, +# Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +import socket + +cdef _inaddr_frompy(sockaddr_in *sin, addr): + cdef int port + if len(addr) != 2: + raise TypeError, 'want address/port pair' + a = addr[0] + if not inet_aton(a, &sin.sin_addr): + raise TypeError, 'bad IP address' + port = addr[1] + if not (0 <= port < 65536): + raise TypeError, 'port number out of range' + sin.sin_port = htons(port) + +cdef _inaddr_topy(sockaddr_in *sin): + return inet_ntoa(sin.sin_addr), ntohs(sin.sin_port) + +cdef class SelIdentify: + cdef ident_request irq + cdef int _activep + cdef readonly localaddr + cdef readonly remoteaddr + cdef _user + cdef _bad + cdef _error + cdef _failed + cdef _bogus + def __new__(me, sk, + userproc = None, bogusproc = None, + badproc = None, errorproc = None, failedproc = None, + *hunoz, **hukairz): + cdef sockaddr_in s_in, s_out + cdef size_t sz_in, sz_out + cdef int fd + if PyObject_TypeCheck(sk, socket.SocketType): + fd = sk.fileno() + sz_in = PSIZEOF(&s_in) + sz_out = PSIZEOF(&s_out) + if getsockname(fd, &s_in, &sz_in) or \ + getpeername(fd, &s_out, &sz_out): + _oserror() + if s_in.sin_family != AF_INET or s_out.sin_family != AF_INET: + raise TypeError, 'must be internet socket' + elif len(sk) != 2: + raise TypeError, 'want pair of addresses' + else: + _inaddr_frompy(&s_in, sk[0]) + _inaddr_frompy(&s_out, sk[1]) + ident(&me.irq, &_sel, &s_in, &s_out, _identfunc, me) + me.localaddr = _inaddr_topy(&s_in) + me.remoteaddr = _inaddr_topy(&s_out) + me._activep = 1 + me._user = _checkcallable(userproc, 'user proc') + me._bad = _checkcallable(badproc, 'bad proc') + me._error = _checkcallable(errorproc, 'error proc') + me._failed = _checkcallable(failedproc, 'failed proc') + me._bogus = _checkcallable(bogusproc, 'bogus proc') + def __dealloc__(me): + if me._activep: + ident_abort(&me.irq) + property activep: + def __get__(me): + return _tobool(me._activep) + property userproc: + def __get__(me): + return me._user + def __set__(me, proc): + me._user = _checkcallable(proc, 'user proc') + def __del__(me): + me._user = None + property eofproc: + def __get__(me): + return me._eof + def __set__(me, proc): + me._eof = _checkcallable(proc, 'eof proc') + def __del__(me): + me._eof = None + property badproc: + def __get__(me): + return me._bad + def __set__(me, proc): + me._bad = _checkcallable(proc, 'bad proc') + def __del__(me): + me._bad = None + property errorproc: + def __get__(me): + return me._error + def __set__(me, proc): + me._error = _checkcallable(proc, 'error proc') + def __del__(me): + me._error = None + property failedproc: + def __get__(me): + return me._failed + def __set__(me, proc): + me._failed = _checkcallable(proc, 'failed proc') + def __del__(me): + me._failed = None + property bogusproc: + def __get__(me): + return me._bogus + def __set__(me, proc): + me._bogus = _checkcallable(proc, 'bogus proc') + def __del__(me): + me._bogus = None + def kill(me): + if not me._activep: + raise ValueError, 'already disabled' + ident_abort(&me.irq) + me._dead() + def _dead(me): + me._activep = 0 + me.dead() + def dead(me): + pass + def user(me, os, user): + return _maybecall(me._user, (os, user)) + def bad(me): + if me._bad is not None: + return me._bad() + return me.bogus() + def error(me, error): + if me._error is not None: + return me._error(error) + return me.bogus() + def failed(me, errno, strerror): + if me._failed is not None: + return me._failed(errno, strerror) + return me.bogus() + def bogus(me): + return _maybecall(me._bogus, ()) + +cdef void _identfunc(ident_reply *i, void *arg): + cdef SelIdentify id + id = arg + id._dead() + if i.type == IDENT_BAD: + id.bad() + elif i.type == IDENT_ERROR: + id.error(i.u.error) + elif i.type == IDENT_USERID: + id.user(i.u.userid.os, i.u.userid.user) + +#----- That's all, folks ----------------------------------------------------