2 ### -*- coding: utf-8 -*-
4 import contextlib
as CTX
11 if SYS
.version_info
>= (3,):
12 def _bin(text
): return text
.encode()
13 def _text(bin
): return bin
.decode()
16 def _bin(text
): return text
17 def _text(bin
): return bin
21 else: raise AssertionError("%r != %r" %
(x
, y
))
24 print(";; test begins (Python %s)" % SYS
.version
)
27 print(";; test crc32...")
28 must_equal(M
.crc32(_bin("abc")), 0x352441c2)
29 must_equal(M
.CRC32().chunk(_bin("a")).chunk(_bin("bc")).done(), 0x352441c2)
32 print(";; test unihash...")
33 assert M
.Unihash().key
is None
34 must_equal(M
.Unihash
.hash(_bin("abc")), 0xbf71f6a2)
35 must_equal(M
.Unihash().chunk(_bin("a")).chunk(_bin("bc")).done(), 0xbf71f6a2)
36 key
= M
.UnihashKey(0x8498a262)
37 assert M
.Unihash(key
).key
is key
38 must_equal(M
.Unihash
.hash(_bin("abc"), key
), 0xecd1e2a2)
39 must_equal(key
.hash(_bin("abc")), 0xecd1e2a2)
40 must_equal(M
.Unihash(key
).chunk(_bin("a")).chunk(_bin("bc")).done(), 0xecd1e2a2)
41 M
.setglobalkey(0x8498a262)
42 must_equal(M
.Unihash
.hash(_bin("abc")), 0xecd1e2a2)
45 print(";; test atom...")
49 assert foo
is M
.Atom("foo")
50 assert bar
is M
.Atom("bär")
52 must_equal(foo
.name
, "foo")
53 must_equal(bar
.name
, "bär")
55 assert gen
is not M
.Atom()
56 assert not gen
.internedp
62 def test_mapping(mapcls
, keya
, keyz
):
64 tab
[keya
] = 69; must_equal(tab
[keya
], 69)
65 tab
[keya
] = 42; must_equal(tab
[keya
], 42)
66 tab
[keyz
] = 'zing'; must_equal(tab
[keyz
], 'zing')
71 must_equal(list(tab
.keys()), [keya
])
72 must_equal(list(tab
.values()), [42])
73 must_equal(list(tab
.items()), [(keya
, 42)])
74 print(";; test assoc...")
75 test_mapping(M
.AssocTable
, foo
, bar
)
76 print(";; test sym...")
77 test_mapping(M
.SymTable
, 'foo', 'bar')
80 print(";; test str...")
81 must_equal(M
.word(" foo bar baz"), ("foo", "bar baz"))
82 must_equal(M
.word(" 'foo \\' bar' baz ", quotep
= True),
83 ("foo ' bar", "baz "))
84 must_equal(M
.split(' one "two \\" three" four five six', 3, quotep
= True),
85 (["one", 'two " three', "four"], "five six"))
86 assert M
.match("foo*bar", "food is served at the bar")
87 must_equal(M
.sanitize("some awful string!", 12), "some_awful_s")
88 assert M
.versioncmp("1.2.5~pre99", "1.2.5") < 0
89 assert M
.versioncmp("1.2.5pre99", "1.2.5") > 0
92 ref
= _bin("just >some ?string to encode")
93 def test_codecclass(enccls
, deccls
, encref
):
94 enc
= enccls("!", 8, M
.CDCF_LOWERC
)
95 e
= enc
.encode(ref
[:10])
96 e
+= enc
.encode(ref
[10:])
99 encref
= encref
.replace("!", "")
100 dec
= deccls(M
.CDCF_IGNCASE
)
101 d
= dec
.decode(encref
[:15])
102 d
+= dec
.decode(encref
[15:])
105 try: deccls().decode("???")
106 except M
.CodecError
as e
:
107 if e
.err
== M
.CDCERR_INVCH
: pass
110 def test_oldcodec(enccls
, deccls
, encref
):
114 e
= enc
.encode(ref
[:10])
115 e
+= enc
.encode(ref
[10:])
117 must_equal(e
, encref
)
118 encref
= encref
.replace("!", "")
120 d
= dec
.decode(encref
[:15])
121 d
+= dec
.decode(encref
[15:])
125 print(";; test base64...")
126 test_codecclass(M
.Base64Encoder
, M
.Base64Decoder
,
127 "anVzdCA+!c29tZSA/!c3RyaW5n!IHRvIGVu!Y29kZQ==")
128 test_codecclass(M
.File64Encoder
, M
.File64Decoder
,
129 "anVzdCA+!c29tZSA%!c3RyaW5n!IHRvIGVu!Y29kZQ==")
130 test_codecclass(M
.Base64URLEncoder
, M
.Base64URLDecoder
,
131 "anVzdCA-!c29tZSA_!c3RyaW5n!IHRvIGVu!Y29kZQ==")
132 test_oldcodec(M
.Base64Encode
, M
.Base64Decode
,
133 "anVzdCA+!c29tZSA/!c3RyaW5n!IHRvIGVu!Y29kZQ==")
135 print(";; test base32...")
136 test_codecclass(M
.Base32Encoder
, M
.Base32Decoder
,
137 "nj2xg5ba!hzzw63lf!ea7xg5ds!nfxgoidu!n4qgk3td!n5sgk===")
138 test_codecclass(M
.Base32HexEncoder
, M
.Base32HexDecoder
,
139 "d9qn6t10!7ppmurb5!40vn6t3i!d5n6e83k!dsg6arj3!dti6a===")
140 test_oldcodec(M
.Base32Encode
, M
.Base32Decode
,
141 "NJ2XG5BA!HZZW63LF!EA7XG5DS!NFXGOIDU!N4QGK3TD!N5SGK===")
143 print(";; test hex...")
144 test_codecclass(M
.HexEncoder
, M
.HexDecoder
,
145 "6a757374!203e736f!6d65203f!73747269!6e672074!6f20656e!636f6465")
146 test_oldcodec(M
.HexEncode
, M
.HexDecode
,
147 "6a757374!203e736f!6d65203f!73747269!6e672074!6f20656e!636f6465")
150 print(";; test url...")
152 uenc
.encode("one", "some/string!")
154 uenc
.encode("two", "some other/string!")
156 uenc
.encode("three", "another!string")
158 must_equal(r
, "one=some/string%21&two=some+other%2fstring%21&three=another!string")
159 must_equal(list(M
.URLDecode(r
)),
160 [("one", "some/string!"),
161 ("two", "some other/string!"),
162 ("three", "another!string")])
166 def stderr_test(want_out
):
167 pin
, pout
= OS
.pipe()
168 fin
= OS
.fdopen(pin
, 'r')
179 must_equal(out
, want_out
)
180 print(";; test report...")
181 M
.ego("my/path/name")
182 must_equal(M
.quis
, "name")
183 with
stderr_test("name: test moaning\n"):
184 M
.moan("test moaning")
185 with
stderr_test("name: test death\n"):
186 try: M
.die("test death")
187 except SystemExit as e
: assert e
.code
== 126
191 print(";; test fwatch...")
192 fd
= OS
.open(",test-stamp", OS
.O_WRONLY | OS
.O_CREAT
, 0o666
)
194 assert not fw
.update()
195 OS
.write(fd
, _bin("stuff\n"))
198 fd2
= OS
.open(",test-stamp.new", OS
.O_WRONLY | OS
.O_CREAT
, 0o666
)
199 OS
.rename(",test-stamp.new", ",test-stamp")
200 assert not fw
.update()
201 fw
.file = ",test-stamp"
205 OS
.unlink(",test-stamp")
208 print(";; test fdflags...")
209 pin
, pout
= OS
.pipe()
210 ofl
= FC
.fcntl(pin
, FC
.F_GETFL
)
211 ofd
= FC
.fcntl(pout
, FC
.F_GETFD
)
212 fout
= OS
.fdopen(pout
, 'wb')
213 M
.fdflags(pin
, fbic
= OS
.O_NONBLOCK
, fxor
= OS
.O_NONBLOCK
)
214 assert FC
.fcntl(pin
, FC
.F_GETFL
) == ofl | OS
.O_NONBLOCK
215 M
.fdflags(pin
, fbic
= OS
.O_NONBLOCK
, fxor
= 0)
216 assert FC
.fcntl(pin
, FC
.F_GETFL
) == ofl
&~OS
.O_NONBLOCK
217 M
.fdflags(fout
, fdbic
= FC
.FD_CLOEXEC
, fdxor
= FC
.FD_CLOEXEC
)
218 assert FC
.fcntl(pout
, FC
.F_GETFD
) == ofd | FC
.FD_CLOEXEC
219 M
.fdflags(fout
, fdbic
= FC
.FD_CLOEXEC
, fdxor
= 0)
220 assert FC
.fcntl(pout
, FC
.F_GETFD
) == ofd
&~FC
.FD_CLOEXEC
225 print(";; test fdpass...")
226 pin
, pout
= OS
.pipe()
227 fin
= OS
.fdopen(pin
, 'r')
228 OS
.write(pout
, _bin("Hello, world!"))
230 sk0
, sk1
= S
.socketpair(S
.AF_UNIX
, S
.SOCK_STREAM
)
231 M
.fdsend(sk0
, fin
, _bin("Have a pipe!"))
234 fd
, msg
= M
.fdrecv(sk1
, 16)
236 must_equal(msg
, _bin("Have a pipe!"))
237 data
= OS
.read(fd
, 16)
239 must_equal(data
, _bin("Hello, world!"))
242 print(";; test mdup...")
244 fd
= OS
.open(",delete-me", OS
.O_WRONLY | OS
.O_CREAT
, 0o666
)
245 OS
.unlink(",delete-me")
249 return st
.st_dev
, st
.st_ino
250 initial
= [mkfd() for i
in xrange(5)]
251 ref
= [fid(fd
) for fd
in initial
]
252 op
= [(initial
[0], initial
[1]),
253 (initial
[1], initial
[2]),
254 (initial
[2], initial
[0]),
255 (initial
[0], initial
[3]),
257 (initial
[0], initial
[4])]
259 for have
, want
in op
:
260 if want
!= -1: must_equal(have
, want
)
261 must_equal(op
[0][0], initial
[1]); must_equal(fid(op
[0][0]), ref
[0])
262 must_equal(op
[1][0], initial
[2]); must_equal(fid(op
[1][0]), ref
[1])
263 must_equal(op
[2][0], initial
[0]); must_equal(fid(op
[2][0]), ref
[2])
264 must_equal(op
[3][0], initial
[3]); must_equal(fid(op
[3][0]), ref
[0])
265 pass; must_equal(fid(op
[4][0]), ref
[3])
266 must_equal(op
[5][0], initial
[4]); must_equal(fid(op
[5][0]), ref
[0])
267 for fd
, _
in op
: OS
.close(fd
)
269 ## (not testing detachtty and daemonize)
274 print(";; test completed OK")