Commit | Line | Data |
---|---|---|
efb0bf0e MW |
1 | #! /usr/bin/python |
2 | ### -*- coding: utf-8 -*- | |
3 | ||
965caf5f MW |
4 | import contextlib as CTX |
5 | import fcntl as FC | |
6 | import os as OS | |
7 | import socket as S | |
efb0bf0e MW |
8 | import sys as SYS |
9 | import mLib as M | |
10 | ||
11 | if SYS.version_info >= (3,): | |
12 | def _bin(text): return text.encode() | |
13 | def _text(bin): return bin.decode() | |
803869bc | 14 | xrange = range |
efb0bf0e MW |
15 | else: |
16 | def _bin(text): return text | |
17 | def _text(bin): return bin | |
18 | ||
19 | def must_equal(x, y): | |
20 | if x == y: pass | |
21 | else: raise AssertionError("%r != %r" % (x, y)) | |
22 | ||
23 | ## Done! | |
24 | print(";; test begins (Python %s)" % SYS.version) | |
25 | ||
26 | ## crc32 | |
965caf5f | 27 | print(";; test crc32...") |
efb0bf0e MW |
28 | must_equal(M.crc32(_bin("abc")), 0x352441c2) |
29 | must_equal(M.CRC32().chunk(_bin("a")).chunk(_bin("bc")).done(), 0x352441c2) | |
30 | ||
31 | ## unihash | |
965caf5f | 32 | print(";; test unihash...") |
efb0bf0e MW |
33 | assert M.Unihash().key is None |
34 | must_equal(M.Unihash.hash(_bin("abc")), 0xbf71f6a2) | |
35 | must_equal(M.Unihash().chunk(_bin("a")).chunk(_bin("bc")).done(), 0xbf71f6a2) | |
36 | key = M.UnihashKey(0x8498a262) | |
37 | assert M.Unihash(key).key is key | |
38 | must_equal(M.Unihash.hash(_bin("abc"), key), 0xecd1e2a2) | |
39 | must_equal(key.hash(_bin("abc")), 0xecd1e2a2) | |
40 | must_equal(M.Unihash(key).chunk(_bin("a")).chunk(_bin("bc")).done(), 0xecd1e2a2) | |
41 | M.setglobalkey(0x8498a262) | |
42 | must_equal(M.Unihash.hash(_bin("abc")), 0xecd1e2a2) | |
43 | ||
44 | ## atom | |
965caf5f | 45 | print(";; test atom...") |
efb0bf0e MW |
46 | foo = M.Atom("foo") |
47 | bar = M.Atom("bär") | |
48 | assert foo != bar | |
49 | assert foo is M.Atom("foo") | |
50 | assert bar is M.Atom("bär") | |
51 | assert foo.internedp | |
52 | must_equal(foo.name, "foo") | |
53 | must_equal(bar.name, "bär") | |
54 | gen = M.Atom() | |
55 | assert gen is not M.Atom() | |
56 | assert not gen.internedp | |
57 | all = set(M.atoms()) | |
58 | assert foo in all | |
59 | assert bar in all | |
60 | ||
61 | ## assoc, sym | |
62 | def test_mapping(mapcls, keya, keyz): | |
63 | tab = mapcls() | |
64 | tab[keya] = 69; must_equal(tab[keya], 69) | |
65 | tab[keya] = 42; must_equal(tab[keya], 42) | |
66 | tab[keyz] = 'zing'; must_equal(tab[keyz], 'zing') | |
67 | del tab[keyz] | |
68 | try: tab[keyz] | |
69 | except KeyError: pass | |
70 | else: assert False | |
71 | must_equal(list(tab.keys()), [keya]) | |
72 | must_equal(list(tab.values()), [42]) | |
73 | must_equal(list(tab.items()), [(keya, 42)]) | |
965caf5f | 74 | print(";; test assoc...") |
efb0bf0e | 75 | test_mapping(M.AssocTable, foo, bar) |
965caf5f | 76 | print(";; test sym...") |
efb0bf0e MW |
77 | test_mapping(M.SymTable, 'foo', 'bar') |
78 | ||
965caf5f MW |
79 | ## str |
80 | print(";; test str...") | |
81 | must_equal(M.word(" foo bar baz"), ("foo", "bar baz")) | |
82 | must_equal(M.word(" 'foo \\' bar' baz ", quotep = True), | |
83 | ("foo ' bar", "baz ")) | |
84 | must_equal(M.split(' one "two \\" three" four five six', 3, quotep = True), | |
85 | (["one", 'two " three', "four"], "five six")) | |
86 | assert M.match("foo*bar", "food is served at the bar") | |
87 | must_equal(M.sanitize("some awful string!", 12), "some_awful_s") | |
88 | assert M.versioncmp("1.2.5~pre99", "1.2.5") < 0 | |
89 | assert M.versioncmp("1.2.5pre99", "1.2.5") > 0 | |
90 | ||
91 | ## codec | |
92 | ref = _bin("just >some ?string to encode") | |
93 | def test_codecclass(enccls, deccls, encref): | |
94 | enc = enccls("!", 8, M.CDCF_LOWERC) | |
95 | e = enc.encode(ref[:10]) | |
96 | e += enc.encode(ref[10:]) | |
97 | e += enc.done() | |
98 | must_equal(e, encref) | |
99 | encref = encref.replace("!", "") | |
100 | dec = deccls(M.CDCF_IGNCASE) | |
101 | d = dec.decode(encref[:15]) | |
102 | d += dec.decode(encref[15:]) | |
103 | d += dec.done() | |
104 | must_equal(d, ref) | |
105 | try: deccls().decode("???") | |
106 | except M.CodecError as e: | |
107 | if e.err == M.CDCERR_INVCH: pass | |
108 | else: raise | |
109 | else: assert False | |
110 | def test_oldcodec(enccls, deccls, encref): | |
111 | enc = enccls() | |
112 | enc.indent = "!" | |
113 | enc.maxline = 8 | |
114 | e = enc.encode(ref[:10]) | |
115 | e += enc.encode(ref[10:]) | |
116 | e += enc.done() | |
117 | must_equal(e, encref) | |
118 | encref = encref.replace("!", "") | |
119 | dec = deccls() | |
120 | d = dec.decode(encref[:15]) | |
121 | d += dec.decode(encref[15:]) | |
122 | d += dec.done() | |
123 | must_equal(d, ref) | |
124 | ||
125 | print(";; test base64...") | |
126 | test_codecclass(M.Base64Encoder, M.Base64Decoder, | |
127 | "anVzdCA+!c29tZSA/!c3RyaW5n!IHRvIGVu!Y29kZQ==") | |
128 | test_codecclass(M.File64Encoder, M.File64Decoder, | |
129 | "anVzdCA+!c29tZSA%!c3RyaW5n!IHRvIGVu!Y29kZQ==") | |
130 | test_codecclass(M.Base64URLEncoder, M.Base64URLDecoder, | |
131 | "anVzdCA-!c29tZSA_!c3RyaW5n!IHRvIGVu!Y29kZQ==") | |
132 | test_oldcodec(M.Base64Encode, M.Base64Decode, | |
133 | "anVzdCA+!c29tZSA/!c3RyaW5n!IHRvIGVu!Y29kZQ==") | |
134 | ||
135 | print(";; test base32...") | |
136 | test_codecclass(M.Base32Encoder, M.Base32Decoder, | |
137 | "nj2xg5ba!hzzw63lf!ea7xg5ds!nfxgoidu!n4qgk3td!n5sgk===") | |
138 | test_codecclass(M.Base32HexEncoder, M.Base32HexDecoder, | |
139 | "d9qn6t10!7ppmurb5!40vn6t3i!d5n6e83k!dsg6arj3!dti6a===") | |
140 | test_oldcodec(M.Base32Encode, M.Base32Decode, | |
141 | "NJ2XG5BA!HZZW63LF!EA7XG5DS!NFXGOIDU!N4QGK3TD!N5SGK===") | |
142 | ||
143 | print(";; test hex...") | |
144 | test_codecclass(M.HexEncoder, M.HexDecoder, | |
145 | "6a757374!203e736f!6d65203f!73747269!6e672074!6f20656e!636f6465") | |
146 | test_oldcodec(M.HexEncode, M.HexDecode, | |
147 | "6a757374!203e736f!6d65203f!73747269!6e672074!6f20656e!636f6465") | |
148 | ||
149 | ## url | |
150 | print(";; test url...") | |
151 | uenc = M.URLEncode() | |
152 | uenc.encode("one", "some/string!") | |
153 | uenc.strictp = True | |
154 | uenc.encode("two", "some other/string!") | |
155 | uenc.laxp = True | |
156 | uenc.encode("three", "another!string") | |
157 | r = uenc.result | |
158 | must_equal(r, "one=some/string%21&two=some+other%2fstring%21&three=another!string") | |
159 | must_equal(list(M.URLDecode(r)), | |
160 | [("one", "some/string!"), | |
161 | ("two", "some other/string!"), | |
162 | ("three", "another!string")]) | |
163 | ||
164 | ## report | |
165 | @CTX.contextmanager | |
166 | def stderr_test(want_out): | |
167 | pin, pout = OS.pipe() | |
168 | fin = OS.fdopen(pin, 'r') | |
169 | olderr = OS.dup(2) | |
170 | OS.dup2(pout, 2) | |
171 | OS.close(pout) | |
172 | try: | |
173 | yield | |
174 | OS.close(2) | |
175 | out = fin.read() | |
176 | finally: | |
177 | fin.close() | |
178 | OS.dup2(olderr, 2) | |
179 | must_equal(out, want_out) | |
180 | print(";; test report...") | |
181 | M.ego("my/path/name") | |
182 | must_equal(M.quis, "name") | |
183 | with stderr_test("name: test moaning\n"): | |
184 | M.moan("test moaning") | |
185 | with stderr_test("name: test death\n"): | |
186 | try: M.die("test death") | |
187 | except SystemExit as e: assert e.code == 126 | |
188 | else: assert False | |
189 | ||
190 | ## fwatch | |
191 | print(";; test fwatch...") | |
192 | fd = OS.open(",test-stamp", OS.O_WRONLY | OS.O_CREAT, 0o666) | |
193 | fw = M.FWatch(fd) | |
194 | assert not fw.update() | |
195 | OS.write(fd, _bin("stuff\n")) | |
196 | assert fw.update() | |
197 | assert fw.file is fd | |
198 | fd2 = OS.open(",test-stamp.new", OS.O_WRONLY | OS.O_CREAT, 0o666) | |
199 | OS.rename(",test-stamp.new", ",test-stamp") | |
200 | assert not fw.update() | |
201 | fw.file = ",test-stamp" | |
202 | assert fw.update() | |
203 | OS.close(fd) | |
204 | OS.close(fd2) | |
205 | OS.unlink(",test-stamp") | |
206 | ||
207 | ## fdflags | |
208 | print(";; test fdflags...") | |
209 | pin, pout = OS.pipe() | |
210 | ofl = FC.fcntl(pin, FC.F_GETFL) | |
211 | ofd = FC.fcntl(pout, FC.F_GETFD) | |
212 | fout = OS.fdopen(pout, 'wb') | |
213 | M.fdflags(pin, fbic = OS.O_NONBLOCK, fxor = OS.O_NONBLOCK) | |
214 | assert FC.fcntl(pin, FC.F_GETFL) == ofl | OS.O_NONBLOCK | |
215 | M.fdflags(pin, fbic = OS.O_NONBLOCK, fxor = 0) | |
216 | assert FC.fcntl(pin, FC.F_GETFL) == ofl&~OS.O_NONBLOCK | |
217 | M.fdflags(fout, fdbic = FC.FD_CLOEXEC, fdxor = FC.FD_CLOEXEC) | |
218 | assert FC.fcntl(pout, FC.F_GETFD) == ofd | FC.FD_CLOEXEC | |
219 | M.fdflags(fout, fdbic = FC.FD_CLOEXEC, fdxor = 0) | |
220 | assert FC.fcntl(pout, FC.F_GETFD) == ofd&~FC.FD_CLOEXEC | |
221 | OS.close(pin) | |
222 | fout.close() | |
223 | ||
224 | ## fdpass | |
225 | print(";; test fdpass...") | |
226 | pin, pout = OS.pipe() | |
227 | fin = OS.fdopen(pin, 'r') | |
228 | OS.write(pout, _bin("Hello, world!")) | |
229 | OS.close(pout) | |
230 | sk0, sk1 = S.socketpair(S.AF_UNIX, S.SOCK_STREAM) | |
231 | M.fdsend(sk0, fin, _bin("Have a pipe!")) | |
232 | fin.close() | |
233 | sk0.close() | |
234 | fd, msg = M.fdrecv(sk1, 16) | |
235 | sk1.close() | |
236 | must_equal(msg, _bin("Have a pipe!")) | |
237 | data = OS.read(fd, 16) | |
238 | OS.close(fd) | |
239 | must_equal(data, _bin("Hello, world!")) | |
240 | ||
241 | ## mdup | |
803869bc MW |
242 | print(";; test mdup...") |
243 | def mkfd(): | |
244 | fd = OS.open(",delete-me", OS.O_WRONLY | OS.O_CREAT, 0o666) | |
245 | OS.unlink(",delete-me") | |
246 | return fd | |
247 | def fid(fd): | |
248 | st = OS.fstat(fd) | |
249 | return st.st_dev, st.st_ino | |
250 | initial = [mkfd() for i in xrange(5)] | |
251 | ref = [fid(fd) for fd in initial] | |
252 | op = [(initial[0], initial[1]), | |
253 | (initial[1], initial[2]), | |
254 | (initial[2], initial[0]), | |
255 | (initial[0], initial[3]), | |
256 | (initial[3], -1), | |
257 | (initial[0], initial[4])] | |
258 | M.mdup(op) | |
259 | for have, want in op: | |
260 | if want != -1: must_equal(have, want) | |
261 | must_equal(op[0][0], initial[1]); must_equal(fid(op[0][0]), ref[0]) | |
262 | must_equal(op[1][0], initial[2]); must_equal(fid(op[1][0]), ref[1]) | |
263 | must_equal(op[2][0], initial[0]); must_equal(fid(op[2][0]), ref[2]) | |
264 | must_equal(op[3][0], initial[3]); must_equal(fid(op[3][0]), ref[0]) | |
265 | pass; must_equal(fid(op[4][0]), ref[3]) | |
266 | must_equal(op[5][0], initial[4]); must_equal(fid(op[5][0]), ref[0]) | |
267 | for fd, _ in op: OS.close(fd) | |
268 | ||
269 | ## (not testing detachtty and daemonize) | |
270 | ||
965caf5f MW |
271 | |
272 | ||
efb0bf0e MW |
273 | ## Done! |
274 | print(";; test completed OK") |