Commit | Line | Data |
---|---|---|
efb0bf0e MW |
1 | #! /usr/bin/python |
2 | ### -*- coding: utf-8 -*- | |
3 | ||
965caf5f MW |
4 | import contextlib as CTX |
5 | import fcntl as FC | |
6 | import os as OS | |
7 | import socket as S | |
efb0bf0e MW |
8 | import sys as SYS |
9 | import mLib as M | |
10 | ||
11 | if SYS.version_info >= (3,): | |
12 | def _bin(text): return text.encode() | |
13 | def _text(bin): return bin.decode() | |
14 | else: | |
15 | def _bin(text): return text | |
16 | def _text(bin): return bin | |
17 | ||
18 | def must_equal(x, y): | |
19 | if x == y: pass | |
20 | else: raise AssertionError("%r != %r" % (x, y)) | |
21 | ||
22 | ## Done! | |
23 | print(";; test begins (Python %s)" % SYS.version) | |
24 | ||
25 | ## crc32 | |
965caf5f | 26 | print(";; test crc32...") |
efb0bf0e MW |
27 | must_equal(M.crc32(_bin("abc")), 0x352441c2) |
28 | must_equal(M.CRC32().chunk(_bin("a")).chunk(_bin("bc")).done(), 0x352441c2) | |
29 | ||
30 | ## unihash | |
965caf5f | 31 | print(";; test unihash...") |
efb0bf0e MW |
32 | assert M.Unihash().key is None |
33 | must_equal(M.Unihash.hash(_bin("abc")), 0xbf71f6a2) | |
34 | must_equal(M.Unihash().chunk(_bin("a")).chunk(_bin("bc")).done(), 0xbf71f6a2) | |
35 | key = M.UnihashKey(0x8498a262) | |
36 | assert M.Unihash(key).key is key | |
37 | must_equal(M.Unihash.hash(_bin("abc"), key), 0xecd1e2a2) | |
38 | must_equal(key.hash(_bin("abc")), 0xecd1e2a2) | |
39 | must_equal(M.Unihash(key).chunk(_bin("a")).chunk(_bin("bc")).done(), 0xecd1e2a2) | |
40 | M.setglobalkey(0x8498a262) | |
41 | must_equal(M.Unihash.hash(_bin("abc")), 0xecd1e2a2) | |
42 | ||
43 | ## atom | |
965caf5f | 44 | print(";; test atom...") |
efb0bf0e MW |
45 | foo = M.Atom("foo") |
46 | bar = M.Atom("bär") | |
47 | assert foo != bar | |
48 | assert foo is M.Atom("foo") | |
49 | assert bar is M.Atom("bär") | |
50 | assert foo.internedp | |
51 | must_equal(foo.name, "foo") | |
52 | must_equal(bar.name, "bär") | |
53 | gen = M.Atom() | |
54 | assert gen is not M.Atom() | |
55 | assert not gen.internedp | |
56 | all = set(M.atoms()) | |
57 | assert foo in all | |
58 | assert bar in all | |
59 | ||
60 | ## assoc, sym | |
61 | def test_mapping(mapcls, keya, keyz): | |
62 | tab = mapcls() | |
63 | tab[keya] = 69; must_equal(tab[keya], 69) | |
64 | tab[keya] = 42; must_equal(tab[keya], 42) | |
65 | tab[keyz] = 'zing'; must_equal(tab[keyz], 'zing') | |
66 | del tab[keyz] | |
67 | try: tab[keyz] | |
68 | except KeyError: pass | |
69 | else: assert False | |
70 | must_equal(list(tab.keys()), [keya]) | |
71 | must_equal(list(tab.values()), [42]) | |
72 | must_equal(list(tab.items()), [(keya, 42)]) | |
965caf5f | 73 | print(";; test assoc...") |
efb0bf0e | 74 | test_mapping(M.AssocTable, foo, bar) |
965caf5f | 75 | print(";; test sym...") |
efb0bf0e MW |
76 | test_mapping(M.SymTable, 'foo', 'bar') |
77 | ||
965caf5f MW |
78 | ## str |
79 | print(";; test str...") | |
80 | must_equal(M.word(" foo bar baz"), ("foo", "bar baz")) | |
81 | must_equal(M.word(" 'foo \\' bar' baz ", quotep = True), | |
82 | ("foo ' bar", "baz ")) | |
83 | must_equal(M.split(' one "two \\" three" four five six', 3, quotep = True), | |
84 | (["one", 'two " three', "four"], "five six")) | |
85 | assert M.match("foo*bar", "food is served at the bar") | |
86 | must_equal(M.sanitize("some awful string!", 12), "some_awful_s") | |
87 | assert M.versioncmp("1.2.5~pre99", "1.2.5") < 0 | |
88 | assert M.versioncmp("1.2.5pre99", "1.2.5") > 0 | |
89 | ||
90 | ## codec | |
91 | ref = _bin("just >some ?string to encode") | |
92 | def test_codecclass(enccls, deccls, encref): | |
93 | enc = enccls("!", 8, M.CDCF_LOWERC) | |
94 | e = enc.encode(ref[:10]) | |
95 | e += enc.encode(ref[10:]) | |
96 | e += enc.done() | |
97 | must_equal(e, encref) | |
98 | encref = encref.replace("!", "") | |
99 | dec = deccls(M.CDCF_IGNCASE) | |
100 | d = dec.decode(encref[:15]) | |
101 | d += dec.decode(encref[15:]) | |
102 | d += dec.done() | |
103 | must_equal(d, ref) | |
104 | try: deccls().decode("???") | |
105 | except M.CodecError as e: | |
106 | if e.err == M.CDCERR_INVCH: pass | |
107 | else: raise | |
108 | else: assert False | |
109 | def test_oldcodec(enccls, deccls, encref): | |
110 | enc = enccls() | |
111 | enc.indent = "!" | |
112 | enc.maxline = 8 | |
113 | e = enc.encode(ref[:10]) | |
114 | e += enc.encode(ref[10:]) | |
115 | e += enc.done() | |
116 | must_equal(e, encref) | |
117 | encref = encref.replace("!", "") | |
118 | dec = deccls() | |
119 | d = dec.decode(encref[:15]) | |
120 | d += dec.decode(encref[15:]) | |
121 | d += dec.done() | |
122 | must_equal(d, ref) | |
123 | ||
124 | print(";; test base64...") | |
125 | test_codecclass(M.Base64Encoder, M.Base64Decoder, | |
126 | "anVzdCA+!c29tZSA/!c3RyaW5n!IHRvIGVu!Y29kZQ==") | |
127 | test_codecclass(M.File64Encoder, M.File64Decoder, | |
128 | "anVzdCA+!c29tZSA%!c3RyaW5n!IHRvIGVu!Y29kZQ==") | |
129 | test_codecclass(M.Base64URLEncoder, M.Base64URLDecoder, | |
130 | "anVzdCA-!c29tZSA_!c3RyaW5n!IHRvIGVu!Y29kZQ==") | |
131 | test_oldcodec(M.Base64Encode, M.Base64Decode, | |
132 | "anVzdCA+!c29tZSA/!c3RyaW5n!IHRvIGVu!Y29kZQ==") | |
133 | ||
134 | print(";; test base32...") | |
135 | test_codecclass(M.Base32Encoder, M.Base32Decoder, | |
136 | "nj2xg5ba!hzzw63lf!ea7xg5ds!nfxgoidu!n4qgk3td!n5sgk===") | |
137 | test_codecclass(M.Base32HexEncoder, M.Base32HexDecoder, | |
138 | "d9qn6t10!7ppmurb5!40vn6t3i!d5n6e83k!dsg6arj3!dti6a===") | |
139 | test_oldcodec(M.Base32Encode, M.Base32Decode, | |
140 | "NJ2XG5BA!HZZW63LF!EA7XG5DS!NFXGOIDU!N4QGK3TD!N5SGK===") | |
141 | ||
142 | print(";; test hex...") | |
143 | test_codecclass(M.HexEncoder, M.HexDecoder, | |
144 | "6a757374!203e736f!6d65203f!73747269!6e672074!6f20656e!636f6465") | |
145 | test_oldcodec(M.HexEncode, M.HexDecode, | |
146 | "6a757374!203e736f!6d65203f!73747269!6e672074!6f20656e!636f6465") | |
147 | ||
148 | ## url | |
149 | print(";; test url...") | |
150 | uenc = M.URLEncode() | |
151 | uenc.encode("one", "some/string!") | |
152 | uenc.strictp = True | |
153 | uenc.encode("two", "some other/string!") | |
154 | uenc.laxp = True | |
155 | uenc.encode("three", "another!string") | |
156 | r = uenc.result | |
157 | must_equal(r, "one=some/string%21&two=some+other%2fstring%21&three=another!string") | |
158 | must_equal(list(M.URLDecode(r)), | |
159 | [("one", "some/string!"), | |
160 | ("two", "some other/string!"), | |
161 | ("three", "another!string")]) | |
162 | ||
163 | ## report | |
164 | @CTX.contextmanager | |
165 | def stderr_test(want_out): | |
166 | pin, pout = OS.pipe() | |
167 | fin = OS.fdopen(pin, 'r') | |
168 | olderr = OS.dup(2) | |
169 | OS.dup2(pout, 2) | |
170 | OS.close(pout) | |
171 | try: | |
172 | yield | |
173 | OS.close(2) | |
174 | out = fin.read() | |
175 | finally: | |
176 | fin.close() | |
177 | OS.dup2(olderr, 2) | |
178 | must_equal(out, want_out) | |
179 | print(";; test report...") | |
180 | M.ego("my/path/name") | |
181 | must_equal(M.quis, "name") | |
182 | with stderr_test("name: test moaning\n"): | |
183 | M.moan("test moaning") | |
184 | with stderr_test("name: test death\n"): | |
185 | try: M.die("test death") | |
186 | except SystemExit as e: assert e.code == 126 | |
187 | else: assert False | |
188 | ||
189 | ## fwatch | |
190 | print(";; test fwatch...") | |
191 | fd = OS.open(",test-stamp", OS.O_WRONLY | OS.O_CREAT, 0o666) | |
192 | fw = M.FWatch(fd) | |
193 | assert not fw.update() | |
194 | OS.write(fd, _bin("stuff\n")) | |
195 | assert fw.update() | |
196 | assert fw.file is fd | |
197 | fd2 = OS.open(",test-stamp.new", OS.O_WRONLY | OS.O_CREAT, 0o666) | |
198 | OS.rename(",test-stamp.new", ",test-stamp") | |
199 | assert not fw.update() | |
200 | fw.file = ",test-stamp" | |
201 | assert fw.update() | |
202 | OS.close(fd) | |
203 | OS.close(fd2) | |
204 | OS.unlink(",test-stamp") | |
205 | ||
206 | ## fdflags | |
207 | print(";; test fdflags...") | |
208 | pin, pout = OS.pipe() | |
209 | ofl = FC.fcntl(pin, FC.F_GETFL) | |
210 | ofd = FC.fcntl(pout, FC.F_GETFD) | |
211 | fout = OS.fdopen(pout, 'wb') | |
212 | M.fdflags(pin, fbic = OS.O_NONBLOCK, fxor = OS.O_NONBLOCK) | |
213 | assert FC.fcntl(pin, FC.F_GETFL) == ofl | OS.O_NONBLOCK | |
214 | M.fdflags(pin, fbic = OS.O_NONBLOCK, fxor = 0) | |
215 | assert FC.fcntl(pin, FC.F_GETFL) == ofl&~OS.O_NONBLOCK | |
216 | M.fdflags(fout, fdbic = FC.FD_CLOEXEC, fdxor = FC.FD_CLOEXEC) | |
217 | assert FC.fcntl(pout, FC.F_GETFD) == ofd | FC.FD_CLOEXEC | |
218 | M.fdflags(fout, fdbic = FC.FD_CLOEXEC, fdxor = 0) | |
219 | assert FC.fcntl(pout, FC.F_GETFD) == ofd&~FC.FD_CLOEXEC | |
220 | OS.close(pin) | |
221 | fout.close() | |
222 | ||
223 | ## fdpass | |
224 | print(";; test fdpass...") | |
225 | pin, pout = OS.pipe() | |
226 | fin = OS.fdopen(pin, 'r') | |
227 | OS.write(pout, _bin("Hello, world!")) | |
228 | OS.close(pout) | |
229 | sk0, sk1 = S.socketpair(S.AF_UNIX, S.SOCK_STREAM) | |
230 | M.fdsend(sk0, fin, _bin("Have a pipe!")) | |
231 | fin.close() | |
232 | sk0.close() | |
233 | fd, msg = M.fdrecv(sk1, 16) | |
234 | sk1.close() | |
235 | must_equal(msg, _bin("Have a pipe!")) | |
236 | data = OS.read(fd, 16) | |
237 | OS.close(fd) | |
238 | must_equal(data, _bin("Hello, world!")) | |
239 | ||
240 | ## mdup | |
241 | ## print(";; test mdup...") | |
242 | ||
243 | ||
efb0bf0e MW |
244 | ## Done! |
245 | print(";; test completed OK") |