| 1 | ### -*-python-*- |
| 2 | ### |
| 3 | ### Testing key-management functionality |
| 4 | ### |
| 5 | ### (c) 2019 Straylight/Edgeware |
| 6 | ### |
| 7 | |
| 8 | ###----- Licensing notice --------------------------------------------------- |
| 9 | ### |
| 10 | ### This file is part of the Python interface to Catacomb. |
| 11 | ### |
| 12 | ### Catacomb/Python is free software: you can redistribute it and/or |
| 13 | ### modify it under the terms of the GNU General Public License as |
| 14 | ### published by the Free Software Foundation; either version 2 of the |
| 15 | ### License, or (at your option) any later version. |
| 16 | ### |
| 17 | ### Catacomb/Python is distributed in the hope that it will be useful, but |
| 18 | ### WITHOUT ANY WARRANTY; without even the implied warranty of |
| 19 | ### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| 20 | ### General Public License for more details. |
| 21 | ### |
| 22 | ### You should have received a copy of the GNU General Public License |
| 23 | ### along with Catacomb/Python. If not, write to the Free Software |
| 24 | ### Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, |
| 25 | ### USA. |
| 26 | |
| 27 | ###-------------------------------------------------------------------------- |
| 28 | ### Imported modules. |
| 29 | |
| 30 | import catacomb as C |
| 31 | import sys as SYS |
| 32 | import unittest as U |
| 33 | import testutils as T |
| 34 | import time as TM |
| 35 | |
| 36 | ###-------------------------------------------------------------------------- |
| 37 | class TestKeyError (U.TestCase): |
| 38 | |
| 39 | def test_keyerror(me): |
| 40 | |
| 41 | try: C.KeyFile("notexist", C.KOPEN_NOFILE).newkey(1, "foo") |
| 42 | except C.KeyError: e = SYS.exc_info()[1] |
| 43 | else: me.fail("expected `catacomb.KeyError'") |
| 44 | me.assertEqual(e.err, C.KERR_READONLY) |
| 45 | me.assertEqual(e.errstring, "Key file is read-only") |
| 46 | me.assertEqual(e.args, (C.KERR_READONLY,)) |
| 47 | me.assertEqual(str(e), |
| 48 | "KERR_READONLY (%d): Key file is read-only" % |
| 49 | C.KERR_READONLY) |
| 50 | |
| 51 | me.assertRaises(TypeError, C.KeyError) |
| 52 | token = ["TOKEN"] |
| 53 | e = C.KeyError(C.KERR_DUPID, token) |
| 54 | me.assertEqual(e.err, C.KERR_DUPID) |
| 55 | me.assertEqual(e.errstring, "Key id already exists") |
| 56 | me.assertEqual(e.args, (C.KERR_DUPID, token)) |
| 57 | |
| 58 | ###-------------------------------------------------------------------------- |
| 59 | class TestKeyFile (U.TestCase): |
| 60 | |
| 61 | def test_keyring(me): |
| 62 | |
| 63 | kf = C.KeyFile("t/keyring") |
| 64 | |
| 65 | ## Check basic attributes. |
| 66 | me.assertEqual(kf.name, "t/keyring") |
| 67 | me.assertEqual(kf.modifiedp, False) |
| 68 | me.assertEqual(kf.writep, False) |
| 69 | me.assertEqual(kf.filep, False) |
| 70 | |
| 71 | ## Check enumeration. |
| 72 | me.assertEqual(set(k.type for k in T.itervalues(kf)), |
| 73 | set(["rsa", "ec", "ec-param", "twofish"])) |
| 74 | me.assertEqual(len(kf), 4) |
| 75 | |
| 76 | ## Start with `rsa'. |
| 77 | k = kf.bytag("ron") |
| 78 | me.assertEqual(k.type, "rsa") |
| 79 | me.assertEqual(k.id, 0x8599dbab) |
| 80 | me.assertEqual(type(k.data), C.KeyDataStructured) |
| 81 | me.assertEqual(set(k.data), set(["e", "n", "private"])) |
| 82 | priv = k.data["private"] |
| 83 | me.assertEqual(type(priv), C.KeyDataEncrypted) |
| 84 | me.assertRaises(C.KeyError, priv.unlock, T.bin("wrong secret")) |
| 85 | priv = priv.unlock(T.bin("very secret")) |
| 86 | me.assertEqual(type(priv), C.KeyDataStructured) |
| 87 | me.assertEqual(set(priv), |
| 88 | set(["p", "q", "d", "d-mod-p", "d-mod-q", "q-inv"])) |
| 89 | me.assertEqual(k.data["n"].mp, priv["p"].mp*priv["q"].mp) |
| 90 | |
| 91 | ## This key has an attribute. Poke about at them. |
| 92 | a = k.attr |
| 93 | me.assertEqual(len(a), 1) |
| 94 | me.assertEqual(set(a), set(["attr"])) |
| 95 | me.assertEqual(a["attr"], "value") |
| 96 | me.assertRaises(KeyError, lambda: a["notexist"]) |
| 97 | me.assertEqual(a.get("attr"), "value") |
| 98 | me.assertEqual(a.get("notexist"), None) |
| 99 | |
| 100 | ## Check fingerprinting while we're here. |
| 101 | for filter in ["-secret", "none"]: |
| 102 | h = C.sha256(); me.assertTrue(k.fingerprint(h, filter)); fp0 = h.done() |
| 103 | h = C.sha256() |
| 104 | h.hash(T.bin("catacomb-key-fingerprint:")) \ |
| 105 | .hashu32(k.id) \ |
| 106 | .hashbuf8(T.bin(k.type)) |
| 107 | h.hash(k.data.encode(filter)) |
| 108 | for a in sorted(T.iterkeys(k.attr)): |
| 109 | h.hashbuf8(T.bin(a)).hashbuf16(T.bin(k.attr[a])) |
| 110 | fp1 = h.done() |
| 111 | me.assertEqual(fp0, fp1) |
| 112 | |
| 113 | ## Try `ec-param'. This should be fairly easy. |
| 114 | k = kf["ec-param"] |
| 115 | me.assertEqual(k.tag, None) |
| 116 | me.assertEqual(k.id, 0x4a4e1ee7) |
| 117 | me.assertEqual(type(k.data), C.KeyDataStructured) |
| 118 | me.assertEqual(set(k.data), set(["curve"])) |
| 119 | curve = k.data["curve"] |
| 120 | me.assertEqual(type(curve), C.KeyDataString) |
| 121 | me.assertEqual(curve.str, "nist-p256") |
| 122 | |
| 123 | ## Check qualified-tag lookups. |
| 124 | me.assertRaises(C.KeyError, kf.qtag, "notexist.curve") |
| 125 | me.assertRaises(C.KeyError, kf.qtag, "ec-param.notexist") |
| 126 | t, k, kd = kf.qtag("ec-param.curve") |
| 127 | me.assertEqual(t, "4a4e1ee7:ec-param.curve") |
| 128 | me.assertEqual(k.type, "ec-param") |
| 129 | me.assertEqual(type(kd), C.KeyDataString) |
| 130 | me.assertEqual(kd.str, "nist-p256") |
| 131 | |
| 132 | ## Try `ec'. A little trickier. |
| 133 | k = kf.bytype("ec") |
| 134 | me.assertEqual(k.tag, None) |
| 135 | me.assertEqual(k.id, 0xbd761d35) |
| 136 | me.assertEqual(type(k.data), C.KeyDataStructured) |
| 137 | me.assertEqual(set(k.data), set(["curve", "p", "private"])) |
| 138 | curve = k.data["curve"] |
| 139 | me.assertEqual(type(curve), C.KeyDataString) |
| 140 | me.assertEqual(curve.str, "nist-p256") |
| 141 | einfo = C.eccurves[curve.str] |
| 142 | me.assertEqual(type(k.data["p"]), C.KeyDataECPt) |
| 143 | X = k.data["p"].ecpt |
| 144 | priv = k.data["private"] |
| 145 | me.assertEqual(type(priv), C.KeyDataEncrypted) |
| 146 | me.assertRaises(C.KeyError, priv.unlock, T.bin("wrong secret")) |
| 147 | priv = priv.unlock(T.bin("super secret")) |
| 148 | me.assertEqual(type(priv), C.KeyDataStructured) |
| 149 | me.assertEqual(set(priv), set(["x"])) |
| 150 | x = priv["x"].mp |
| 151 | me.assertEqual(x*einfo.G, X) |
| 152 | |
| 153 | ## Finish with `twofish'. |
| 154 | k = kf.byid(0x60090be2) |
| 155 | me.assertEqual(k.tag, None) |
| 156 | me.assertEqual(k.type, "twofish") |
| 157 | me.assertEqual(type(k.data), C.KeyDataEncrypted) |
| 158 | me.assertRaises(C.KeyError, k.data.unlock, T.bin("wrong secret")) |
| 159 | kd = k.data.unlock(T.bin("not secret")) |
| 160 | me.assertEqual(type(kd), C.KeyDataBinary) |
| 161 | me.assertEqual(kd.bin, C.bytes("d337b98eea24425826df202a6a3d1ef8" |
| 162 | "377b71923fe1179451564776da29bb84")) |
| 163 | |
| 164 | ## Check unsuccessful searches. |
| 165 | me.assertRaises(KeyError, lambda: kf["notexist"]) |
| 166 | me.assertEqual(kf.bytag("notexist"), None) |
| 167 | me.assertEqual(kf.bytag(12345), None) |
| 168 | me.assertEqual(kf.bytype("notexist"), None) |
| 169 | me.assertRaises(TypeError, kf.bytype, 12345) |
| 170 | me.assertRaises(C.KeyError, kf.byid, 0x12345678) |
| 171 | |
| 172 | ## The keyring should be readonly. |
| 173 | me.assertRaises(C.KeyError, kf.newkey, 0x12345678, "fail") |
| 174 | me.assertRaises(C.KeyError, setattr, k, "tag", "foo") |
| 175 | me.assertRaises(C.KeyError, delattr, k, "tag") |
| 176 | me.assertRaises(C.KeyError, setattr, k, "data", C.KeyDataString("foo")) |
| 177 | |
| 178 | def test_keywrite(me): |
| 179 | kf = C.KeyFile("test", C.KOPEN_WRITE | C.KOPEN_NOFILE) |
| 180 | me.assertEqual(kf.modifiedp, False) |
| 181 | now = int(TM.time()) |
| 182 | exp = now + 86400 |
| 183 | |
| 184 | k = kf.newkey(0x11111111, "first", exp) |
| 185 | me.assertEqual(kf.modifiedp, True) |
| 186 | |
| 187 | me.assertEqual(kf[0x11111111].id, 0x11111111) |
| 188 | me.assertEqual(k.exptime, exp) |
| 189 | me.assertEqual(k.deltime, exp) |
| 190 | me.assertRaises(ValueError, setattr, k, "deltime", C.KEXP_FOREVER) |
| 191 | k.exptime = exp + 5 |
| 192 | me.assertEqual(k.data.str, "<unset>") |
| 193 | n = 9876543210 |
| 194 | k.data = C.KeyDataMP(n) |
| 195 | me.assertEqual(k.data.mp, n) |
| 196 | me.assertEqual(k.comment, None) |
| 197 | c = ";; just a test" |
| 198 | k.comment = c |
| 199 | me.assertEqual(k.comment, c) |
| 200 | k.comment = None |
| 201 | me.assertEqual(k.comment, None) |
| 202 | k.comment = c |
| 203 | me.assertEqual(k.comment, c) |
| 204 | del k.comment |
| 205 | me.assertEqual(k.comment, None) |
| 206 | |
| 207 | ###-------------------------------------------------------------------------- |
| 208 | |
| 209 | def keydata_equalp(kd0, kd1): |
| 210 | if type(kd0) is not type(kd1): return False |
| 211 | elif type(kd0) is C.KeyDataBinary: return kd0.bin == kd1.bin |
| 212 | elif type(kd0) is C.KeyDataMP: return kd0.mp == kd1.mp |
| 213 | elif type(kd0) is C.KeyDataEncrypted: return kd0.ct == kd1.ct |
| 214 | elif type(kd0) is C.KeyDataECPt: return kd0.ecpt == kd1.ecpt |
| 215 | elif type(kd0) is C.KeyDataString: return kd0.str == kd1.str |
| 216 | elif type(kd0) is C.KeyDataStructured: |
| 217 | if len(kd0) != len(kd1): return False |
| 218 | for t, v0 in T.iteritems(kd0): |
| 219 | try: v1 = kd1[t] |
| 220 | except KeyError: return False |
| 221 | if not keydata_equalp(v0, v1): return False |
| 222 | return True |
| 223 | else: |
| 224 | raise SystemError("unexpected keydata type") |
| 225 | |
| 226 | class TestKeyData (U.TestCase): |
| 227 | |
| 228 | def test_flags(me): |
| 229 | me.assertEqual(C.KeyData.readflags("none"), (0, 0, "")) |
| 230 | me.assertEqual(C.KeyData.readflags("ec,public:..."), |
| 231 | (C.KENC_EC | C.KCAT_PUB, |
| 232 | C.KF_ENCMASK | C.KF_CATMASK, |
| 233 | ":...")) |
| 234 | me.assertEqual(C.KeyData.readflags("int,burn"), |
| 235 | (C.KENC_MP | C.KF_BURN, C.KF_ENCMASK | C.KF_BURN, "")) |
| 236 | me.assertRaises(C.KeyError, C.KeyData.readflags, "int,burn?") |
| 237 | me.assertRaises(C.KeyError, C.KeyData.readflags, "int,ec") |
| 238 | me.assertRaises(C.KeyError, C.KeyData.readflags, "snork") |
| 239 | me.assertEqual(C.KeyData.writeflags(0), "binary,symmetric") |
| 240 | me.assertEqual(C.KeyData.writeflags(C.KENC_EC | C.KCAT_PUB), "ec,public") |
| 241 | |
| 242 | def test_misc(me): |
| 243 | kd = C.KeyDataStructured({ "a": C.KeyDataString("foo", "public"), |
| 244 | "b": C.KeyDataMP(12345, "private"), |
| 245 | "c": C.KeyDataString("bar", "public") }) |
| 246 | |
| 247 | kd2 = kd.copy() |
| 248 | me.assertEqual(type(kd2), C.KeyDataStructured) |
| 249 | me.assertEqual(set(T.iterkeys(kd2)), set(["a", "b", "c"])) |
| 250 | |
| 251 | kd2 = C.KeyDataMP(12345, C.KCAT_PRIV).copy("private") |
| 252 | |
| 253 | kd2 = kd.copy("-secret") |
| 254 | me.assertEqual(type(kd2), C.KeyDataStructured) |
| 255 | me.assertEqual(set(T.iterkeys(kd2)), set(["a", "c"])) |
| 256 | |
| 257 | kd2 = kd.copy((0, C.KF_NONSECRET)) |
| 258 | me.assertEqual(type(kd2), C.KeyDataStructured) |
| 259 | me.assertEqual(set(T.iterkeys(kd2)), set(["b"])) |
| 260 | |
| 261 | def check_encode(me, kd): |
| 262 | me.assertTrue(keydata_equalp(C.KeyData.decode(kd.encode()), kd)) |
| 263 | kd1, tail = C.KeyData.read(kd.write()) |
| 264 | me.assertEqual(tail, "") |
| 265 | me.assertTrue(keydata_equalp(kd, kd1)) |
| 266 | |
| 267 | def test_bin(me): |
| 268 | rng = T.detrand("kd-bin") |
| 269 | by = rng.block(16) |
| 270 | kd = C.KeyDataBinary(by, "symm,burn") |
| 271 | me.assertEqual(kd.bin, by) |
| 272 | me.check_encode(kd) |
| 273 | |
| 274 | def test_mp(me): |
| 275 | rng = T.detrand("kd-mp") |
| 276 | x = rng.mp(128) |
| 277 | kd = C.KeyDataMP(x, "symm,burn") |
| 278 | me.assertEqual(kd.mp, x) |
| 279 | me.check_encode(kd) |
| 280 | |
| 281 | def test_string(me): |
| 282 | s = "some random string" |
| 283 | kd = C.KeyDataString(s, "symm,burn") |
| 284 | me.assertEqual(kd.str, s) |
| 285 | me.check_encode(kd) |
| 286 | |
| 287 | def test_enc(me): |
| 288 | rng = T.detrand("kd-enc") |
| 289 | ct = rng.block(16) |
| 290 | kd = C.KeyDataEncrypted(ct, "symm") |
| 291 | me.assertEqual(kd.ct, ct) |
| 292 | me.check_encode(kd) |
| 293 | |
| 294 | def test_ecpt(me): |
| 295 | rng = T.detrand("kd-ec") |
| 296 | Q = C.ECPt(rng.mp(128), rng.mp(128)) |
| 297 | kd = C.KeyDataECPt(Q, "symm,burn") |
| 298 | me.assertEqual(kd.ecpt, Q) |
| 299 | me.check_encode(kd) |
| 300 | |
| 301 | def test_struct(me): |
| 302 | rng = T.detrand("kd-struct") |
| 303 | kd = C.KeyDataStructured({ "a": C.KeyDataString("a"), |
| 304 | "b": C.KeyDataString("b"), |
| 305 | "c": C.KeyDataString("c"), |
| 306 | "d": C.KeyDataString("d") }) |
| 307 | for i in ["a", "b", "c", "d"]: me.assertEqual(kd[i].str, i) |
| 308 | me.assertEqual(len(kd), 4) |
| 309 | me.check_encode(kd) |
| 310 | me.assertRaises(TypeError, C.KeyDataStructured, { "a": "a" }) |
| 311 | |
| 312 | ###-------------------------------------------------------------------------- |
| 313 | ### Mappings. |
| 314 | |
| 315 | class TestKeyFileMapping (T.ImmutableMappingTextMixin): |
| 316 | def _mkkey(me, i): return i |
| 317 | def _getkey(me, k): return k |
| 318 | def _getvalue(me, v): return v.data.mp |
| 319 | |
| 320 | def test_keyfile(me): |
| 321 | kf = C.KeyFile("test", C.KOPEN_WRITE | C.KOPEN_NOFILE) |
| 322 | model = {} |
| 323 | for i in [1, 2, 3]: |
| 324 | model[i] = 100 + i |
| 325 | kf.newkey(i, "k#%d" % i).data = C.KeyDataMP(100 + i) |
| 326 | |
| 327 | me.check_immutable_mapping(kf, model) |
| 328 | |
| 329 | class TestKeyAttrMapping (T.MutableMappingTestMixin): |
| 330 | |
| 331 | def test_attrmap(me): |
| 332 | def mkmap(): |
| 333 | kf = C.KeyFile("test", C.KOPEN_WRITE | C.KOPEN_NOFILE) |
| 334 | k = kf.newkey(0x12345678, "test-key") |
| 335 | return k.attr |
| 336 | me.check_mapping(mkmap) |
| 337 | |
| 338 | a = mkmap() |
| 339 | me.assertRaises(TypeError, a.update, { 3: 3, 4: 5 }) |
| 340 | |
| 341 | ###----- That's all, folks -------------------------------------------------- |
| 342 | |
| 343 | if __name__ == "__main__": U.main() |