@@@ cython and python 3 wip
[mLib-python] / test.py
diff --git a/test.py b/test.py
index 33f781e..9cd09e2 100644 (file)
--- a/test.py
+++ b/test.py
@@ -1,6 +1,10 @@
 #! /usr/bin/python
 ### -*- coding: utf-8 -*-
 
+import contextlib as CTX
+import fcntl as FC
+import os as OS
+import socket as S
 import sys as SYS
 import mLib as M
 
@@ -19,10 +23,12 @@ def must_equal(x, y):
 print(";; test begins (Python %s)" % SYS.version)
 
 ## crc32
+print(";; test crc32...")
 must_equal(M.crc32(_bin("abc")), 0x352441c2)
 must_equal(M.CRC32().chunk(_bin("a")).chunk(_bin("bc")).done(), 0x352441c2)
 
 ## unihash
+print(";; test unihash...")
 assert M.Unihash().key is None
 must_equal(M.Unihash.hash(_bin("abc")), 0xbf71f6a2)
 must_equal(M.Unihash().chunk(_bin("a")).chunk(_bin("bc")).done(), 0xbf71f6a2)
@@ -35,6 +41,7 @@ M.setglobalkey(0x8498a262)
 must_equal(M.Unihash.hash(_bin("abc")), 0xecd1e2a2)
 
 ## atom
+print(";; test atom...")
 foo = M.Atom("foo")
 bar = M.Atom("bär")
 assert foo != bar
@@ -63,8 +70,176 @@ def test_mapping(mapcls, keya, keyz):
   must_equal(list(tab.keys()), [keya])
   must_equal(list(tab.values()), [42])
   must_equal(list(tab.items()), [(keya, 42)])
+print(";; test assoc...")
 test_mapping(M.AssocTable, foo, bar)
+print(";; test sym...")
 test_mapping(M.SymTable, 'foo', 'bar')
 
+## str
+print(";; test str...")
+must_equal(M.word(" foo  bar baz"), ("foo", "bar baz"))
+must_equal(M.word("  'foo \\' bar'  baz ", quotep = True),
+           ("foo ' bar", "baz "))
+must_equal(M.split(' one "two \\" three" four five six', 3, quotep = True),
+           (["one", 'two " three', "four"], "five six"))
+assert M.match("foo*bar", "food is served at the bar")
+must_equal(M.sanitize("some awful string!", 12), "some_awful_s")
+assert M.versioncmp("1.2.5~pre99", "1.2.5") < 0
+assert M.versioncmp("1.2.5pre99", "1.2.5") > 0
+
+## codec
+ref = _bin("just >some ?string to encode")
+def test_codecclass(enccls, deccls, encref):
+  enc = enccls("!", 8, M.CDCF_LOWERC)
+  e = enc.encode(ref[:10])
+  e += enc.encode(ref[10:])
+  e += enc.done()
+  must_equal(e, encref)
+  encref = encref.replace("!", "")
+  dec = deccls(M.CDCF_IGNCASE)
+  d = dec.decode(encref[:15])
+  d += dec.decode(encref[15:])
+  d += dec.done()
+  must_equal(d, ref)
+  try: deccls().decode("???")
+  except M.CodecError as e:
+    if e.err == M.CDCERR_INVCH: pass
+    else: raise
+  else: assert False
+def test_oldcodec(enccls, deccls, encref):
+  enc = enccls()
+  enc.indent = "!"
+  enc.maxline = 8
+  e = enc.encode(ref[:10])
+  e += enc.encode(ref[10:])
+  e += enc.done()
+  must_equal(e, encref)
+  encref = encref.replace("!", "")
+  dec = deccls()
+  d = dec.decode(encref[:15])
+  d += dec.decode(encref[15:])
+  d += dec.done()
+  must_equal(d, ref)
+
+print(";; test base64...")
+test_codecclass(M.Base64Encoder, M.Base64Decoder,
+                "anVzdCA+!c29tZSA/!c3RyaW5n!IHRvIGVu!Y29kZQ==")
+test_codecclass(M.File64Encoder, M.File64Decoder,
+                "anVzdCA+!c29tZSA%!c3RyaW5n!IHRvIGVu!Y29kZQ==")
+test_codecclass(M.Base64URLEncoder, M.Base64URLDecoder,
+                "anVzdCA-!c29tZSA_!c3RyaW5n!IHRvIGVu!Y29kZQ==")
+test_oldcodec(M.Base64Encode, M.Base64Decode,
+              "anVzdCA+!c29tZSA/!c3RyaW5n!IHRvIGVu!Y29kZQ==")
+
+print(";; test base32...")
+test_codecclass(M.Base32Encoder, M.Base32Decoder,
+                "nj2xg5ba!hzzw63lf!ea7xg5ds!nfxgoidu!n4qgk3td!n5sgk===")
+test_codecclass(M.Base32HexEncoder, M.Base32HexDecoder,
+                "d9qn6t10!7ppmurb5!40vn6t3i!d5n6e83k!dsg6arj3!dti6a===")
+test_oldcodec(M.Base32Encode, M.Base32Decode,
+              "NJ2XG5BA!HZZW63LF!EA7XG5DS!NFXGOIDU!N4QGK3TD!N5SGK===")
+
+print(";; test hex...")
+test_codecclass(M.HexEncoder, M.HexDecoder,
+                "6a757374!203e736f!6d65203f!73747269!6e672074!6f20656e!636f6465")
+test_oldcodec(M.HexEncode, M.HexDecode,
+              "6a757374!203e736f!6d65203f!73747269!6e672074!6f20656e!636f6465")
+
+## url
+print(";; test url...")
+uenc = M.URLEncode()
+uenc.encode("one", "some/string!")
+uenc.strictp = True
+uenc.encode("two", "some other/string!")
+uenc.laxp = True
+uenc.encode("three", "another!string")
+r = uenc.result
+must_equal(r, "one=some/string%21&two=some+other%2fstring%21&three=another!string")
+must_equal(list(M.URLDecode(r)),
+           [("one", "some/string!"),
+            ("two", "some other/string!"),
+            ("three", "another!string")])
+
+## report
+@CTX.contextmanager
+def stderr_test(want_out):
+  pin, pout = OS.pipe()
+  fin = OS.fdopen(pin, 'r')
+  olderr = OS.dup(2)
+  OS.dup2(pout, 2)
+  OS.close(pout)
+  try:
+    yield
+    OS.close(2)
+    out = fin.read()
+  finally:
+    fin.close()
+    OS.dup2(olderr, 2)
+  must_equal(out, want_out)
+print(";; test report...")
+M.ego("my/path/name")
+must_equal(M.quis, "name")
+with stderr_test("name: test moaning\n"):
+  M.moan("test moaning")
+with stderr_test("name: test death\n"):
+  try: M.die("test death")
+  except SystemExit as e: assert e.code == 126
+  else: assert False
+
+## fwatch
+print(";; test fwatch...")
+fd = OS.open(",test-stamp", OS.O_WRONLY | OS.O_CREAT, 0o666)
+fw = M.FWatch(fd)
+assert not fw.update()
+OS.write(fd, _bin("stuff\n"))
+assert fw.update()
+assert fw.file is fd
+fd2 = OS.open(",test-stamp.new", OS.O_WRONLY | OS.O_CREAT, 0o666)
+OS.rename(",test-stamp.new", ",test-stamp")
+assert not fw.update()
+fw.file = ",test-stamp"
+assert fw.update()
+OS.close(fd)
+OS.close(fd2)
+OS.unlink(",test-stamp")
+
+## fdflags
+print(";; test fdflags...")
+pin, pout = OS.pipe()
+ofl = FC.fcntl(pin, FC.F_GETFL)
+ofd = FC.fcntl(pout, FC.F_GETFD)
+fout = OS.fdopen(pout, 'wb')
+M.fdflags(pin, fbic = OS.O_NONBLOCK, fxor = OS.O_NONBLOCK)
+assert FC.fcntl(pin, FC.F_GETFL) == ofl | OS.O_NONBLOCK
+M.fdflags(pin, fbic = OS.O_NONBLOCK, fxor = 0)
+assert FC.fcntl(pin, FC.F_GETFL) == ofl&~OS.O_NONBLOCK
+M.fdflags(fout, fdbic = FC.FD_CLOEXEC, fdxor = FC.FD_CLOEXEC)
+assert FC.fcntl(pout, FC.F_GETFD) == ofd | FC.FD_CLOEXEC
+M.fdflags(fout, fdbic = FC.FD_CLOEXEC, fdxor = 0)
+assert FC.fcntl(pout, FC.F_GETFD) == ofd&~FC.FD_CLOEXEC
+OS.close(pin)
+fout.close()
+
+## fdpass
+print(";; test fdpass...")
+pin, pout = OS.pipe()
+fin = OS.fdopen(pin, 'r')
+OS.write(pout, _bin("Hello, world!"))
+OS.close(pout)
+sk0, sk1 = S.socketpair(S.AF_UNIX, S.SOCK_STREAM)
+M.fdsend(sk0, fin, _bin("Have a pipe!"))
+fin.close()
+sk0.close()
+fd, msg = M.fdrecv(sk1, 16)
+sk1.close()
+must_equal(msg, _bin("Have a pipe!"))
+data = OS.read(fd, 16)
+OS.close(fd)
+must_equal(data, _bin("Hello, world!"))
+
+## mdup
+## print(";; test mdup...")
+
+
 ## Done!
 print(";; test completed OK")