Commit | Line | Data |
---|---|---|
8eabb4ff MW |
1 | /* -*-scala-*- |
2 | * | |
3 | * Managing TrIPE administration connections | |
4 | * | |
5 | * (c) 2018 Straylight/Edgeware | |
6 | */ | |
7 | ||
8 | /*----- Licensing notice --------------------------------------------------* | |
9 | * | |
10 | * This file is part of the Trivial IP Encryption (TrIPE) Android app. | |
11 | * | |
12 | * TrIPE is free software: you can redistribute it and/or modify it under | |
13 | * the terms of the GNU General Public License as published by the Free | |
14 | * Software Foundation; either version 3 of the License, or (at your | |
15 | * option) any later version. | |
16 | * | |
17 | * TrIPE is distributed in the hope that it will be useful, but WITHOUT | |
18 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or | |
19 | * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License | |
20 | * for more details. | |
21 | * | |
22 | * You should have received a copy of the GNU General Public License | |
23 | * along with TrIPE. If not, see <https://www.gnu.org/licenses/>. | |
24 | */ | |
25 | ||
26 | package uk.org.distorted.tripe; package object admin { | |
27 | ||
28 | /*----- Imports -----------------------------------------------------------*/ | |
29 | ||
3bb2303d | 30 | import java.io.{BufferedReader, InputStreamReader, OutputStreamWriter}; |
8eabb4ff MW |
31 | import java.util.concurrent.locks.{Condition, ReentrantLock => Lock}; |
32 | ||
0157de02 | 33 | import scala.collection.mutable.HashMap; |
8eabb4ff MW |
34 | import scala.concurrent.Channel; |
35 | import scala.util.control.Breaks; | |
36 | ||
25c35469 | 37 | import Implicits._; |
3bb2303d | 38 | import sys.{serverInput, serverOutput}; |
8eabb4ff MW |
39 | |
40 | /*----- Classification of server messages ---------------------------------*/ | |
41 | ||
42 | sealed abstract class Message; | |
43 | ||
44 | sealed abstract class JobMessage extends Message; | |
45 | case object JobOK extends JobMessage; | |
c8292b34 MW |
46 | final case class JobInfo(info: Seq[String]) extends JobMessage; |
47 | final case class JobFail(err: Seq[String]) extends JobMessage; | |
8eabb4ff MW |
48 | case object JobLostConnection extends JobMessage; |
49 | ||
c8292b34 | 50 | final case class BackgroundJobMessage(tag: String, msg: JobMessage) |
8eabb4ff | 51 | extends Message; |
c8292b34 | 52 | final case class JobDetached(tag: String) extends Message; |
8eabb4ff MW |
53 | |
54 | sealed abstract class AsyncMessage extends Message; | |
c8292b34 MW |
55 | final case class Trace(msg: String) extends AsyncMessage; |
56 | final case class Warning(err: Seq[String]) extends AsyncMessage; | |
57 | final case class Notify(note: Seq[String]) extends AsyncMessage; | |
8eabb4ff MW |
58 | case object ConnectionLost extends AsyncMessage; |
59 | ||
60 | sealed abstract class ServiceMessage extends Message; | |
c8292b34 MW |
61 | final case class ServiceCancel(jobid: String) extends ServiceMessage; |
62 | final case class ServiceClaim(svc: String, version: String) | |
8eabb4ff | 63 | extends ServiceMessage; |
c8292b34 | 64 | final case class ServiceJob(jobid: String, svc: String, |
8eabb4ff MW |
65 | cmd: String, args: Seq[String]) |
66 | extends ServiceMessage; | |
67 | ||
68 | /*----- Main code ---------------------------------------------------------*/ | |
69 | ||
8eabb4ff MW |
70 | class ConnectionClosed extends Exception; |
71 | ||
72 | class ServerFailed(msg: String) extends Exception(msg); | |
73 | ||
74 | class CommandFailed(val msg: Seq[String]) extends Exception { | |
75 | override def getMessage(): String = | |
76 | "%s(%s)".format(getClass.getName, quoteTokens(msg)); | |
77 | } | |
78 | ||
79 | class ConnectionLostException extends Exception; | |
80 | ||
0157de02 | 81 | object Connection extends Hook[AsyncMessage] |
8eabb4ff MW |
82 | { |
83 | /* Synchronization. | |
84 | * | |
85 | * This class is complicatedly multithreaded. The following fields must | |
86 | * only be accessed while the instance is locked. To prevent deadlocks, | |
87 | * hold the `Connection' lock before locking any individual `Job' objects. | |
88 | */ | |
89 | ||
3bb2303d MW |
90 | private var livep: Boolean = true; // Is this connection still alive? |
91 | private var fgjob: Option[this.Job] = None; // Foreground job, if there is one. | |
92 | private val jobmap = new HashMap[String, this.Job]; // Maps tags to extant jobs. | |
93 | private var bgseq = 0; // Next background job tag. | |
8eabb4ff | 94 | |
3bb2303d MW |
95 | private val in = new BufferedReader(new InputStreamReader(serverInput)); |
96 | private val out = new OutputStreamWriter(serverOutput); | |
97 | ||
98 | type Pub = Connection.type; | |
fd8dac14 | 99 | |
8eabb4ff MW |
100 | class Job extends Iterator[Seq[String]] { |
101 | private[Connection] val ch = new Channel[JobMessage]; | |
102 | private[this] var nextmsg: Option[JobMessage] = None; | |
103 | ||
104 | private[this] def fetchNext() | |
0157de02 | 105 | { if (!nextmsg) nextmsg = Some(ch.read); } |
8eabb4ff MW |
106 | override def hasNext: Boolean = { |
107 | fetchNext(); | |
108 | nextmsg match { | |
109 | case Some(JobOK) => false | |
110 | case _ => true | |
111 | } | |
112 | } | |
113 | override def next(): Seq[String] = { | |
114 | fetchNext(); | |
115 | nextmsg match { | |
7eb3f62e | 116 | case None => unreachable; |
0157de02 MW |
117 | case Some(JobOK) => throw new NoSuchElementException; |
118 | case Some(JobFail(msg)) => throw new CommandFailed(msg); | |
119 | case Some(JobLostConnection) => throw new ConnectionLostException; | |
8eabb4ff | 120 | case Some(JobInfo(msg)) => nextmsg = None; msg |
7894831e MW |
121 | } |
122 | } | |
7894831e | 123 | |
8eabb4ff MW |
124 | def keyvals(): Map[String, String] = { |
125 | val b = Map.newBuilder[String, String]; | |
126 | for (line <- this; token <- line) { | |
127 | token.indexOf('=') match { | |
128 | case -1 => throw new ServerFailed("missing `=' in key-value list"); | |
129 | case eq => | |
130 | val k = token.substring(0, eq); | |
131 | val v = token.substring(eq + 1); | |
132 | b += k -> v; | |
133 | } | |
134 | } | |
135 | b.result | |
136 | } | |
7894831e | 137 | |
8eabb4ff MW |
138 | def traceish(): Seq[(Char, Boolean, String)] = { |
139 | val b = Seq.newBuilder[(Char, Boolean, String)]; | |
140 | for (line <- this) line match { | |
141 | case List(key, desc@_*) => | |
142 | val live = if (key.length == 1) false | |
143 | else if (key.length == 2 && key(1) == '+') true | |
144 | else throw new ServerFailed( | |
145 | s"incomprehensible traceish key `$key'"); | |
146 | b += ((key(0), live, desc.mkString(" "))); | |
147 | case _ => throw new ServerFailed("empty line in traceish output"); | |
148 | } | |
149 | b.result | |
150 | } | |
7894831e | 151 | |
8eabb4ff MW |
152 | def expectEmpty() { |
153 | if (hasNext) throw new ServerFailed("no output expected"); | |
7894831e | 154 | } |
8eabb4ff MW |
155 | |
156 | def oneLine(): Seq[String] = { | |
157 | if (hasNext) { | |
158 | val line = next(); | |
159 | if (!hasNext) return line; | |
7894831e | 160 | } |
8eabb4ff MW |
161 | throw new ServerFailed("exactly one line expected"); |
162 | } | |
163 | } | |
164 | ||
165 | def submit(bg: Boolean, toks: String*): this.Job = { | |
166 | var cmd = toks; | |
167 | println(";; wait for lock"); | |
168 | synchronized { | |
169 | if (bg) { | |
170 | val tag = bgseq formatted "J%05d"; bgseq += 1; | |
171 | cmd = toks match { | |
172 | case Seq(cmd, tail@_*) => cmd +: "-background" +: tag +: tail; | |
173 | } | |
174 | } | |
175 | println(";; wait for foreground"); | |
176 | while (livep && fgjob != None) wait(); | |
177 | if (!livep) throw new ConnectionClosed; | |
178 | println(";; write command"); | |
179 | try { out.write(quoteTokens(cmd)); out.write('\n'); out.flush(); } | |
180 | catch { case e: Throwable => notify(); throw e; } | |
181 | val j = new Job; | |
182 | fgjob = Some(j); | |
183 | j | |
184 | } | |
185 | } | |
186 | ||
187 | def submit(toks: String*): this.Job = submit(false, toks: _*); | |
188 | ||
8eabb4ff MW |
189 | /* These two expect the connection lock to be held. */ |
190 | def foregroundJob: Job = | |
191 | fgjob.getOrElse { throw new ServerFailed("no foreground job"); } | |
192 | def releaseForegroundJob() { fgjob = None; notify(); } | |
193 | ||
194 | def parseServerLine(s: String): Message = nextToken(s) match { | |
195 | case None => throw new ServerFailed("empty line from server") | |
196 | case Some(("TRACE", next)) => Trace(s.substring(next)) | |
197 | case Some((code, next)) => (code, splitTokens(s, next)) match { | |
198 | case ("OK", Seq()) => JobOK | |
199 | case ("INFO", tail) => JobInfo(tail) | |
200 | case ("FAIL", tail) => JobFail(tail) | |
201 | case ("BGDETACH", Seq(tag)) => JobDetached(tag) | |
202 | case ("BGOK", Seq(tag)) => BackgroundJobMessage(tag, JobOK) | |
203 | case ("BGINFO", Seq(tag, tail@_*)) => | |
204 | BackgroundJobMessage(tag, JobInfo(tail)) | |
205 | case ("BGFAIL", Seq(tag, tail@_*)) => | |
206 | BackgroundJobMessage(tag, JobFail(tail)) | |
207 | case ("WARN", tail) => Warning(tail) | |
208 | case ("NOTE", tail) => Notify(tail) | |
209 | case ("SVCCLAIM", Seq(svc, ver)) => ServiceClaim(svc, ver) | |
210 | case ("SVCJOB", Seq(tag, svc, cmd, args@_*)) => | |
211 | ServiceJob(tag, svc, cmd, args) | |
212 | case ("SVCCANCEL", Seq(tag)) => ServiceCancel(tag) | |
213 | case (_, tail) => throw new ServerFailed( | |
214 | "incomprehensible line from server: " + quoteTokens(code +: tail)) | |
7894831e | 215 | } |
7894831e MW |
216 | } |
217 | ||
8eabb4ff MW |
218 | def processJobMessage(msg: JobMessage) |
219 | (getjob: (Boolean) => Job) { | |
220 | synchronized { getjob(msg.isInstanceOf[JobInfo]); }.ch.write(msg); | |
7894831e | 221 | } |
8eabb4ff MW |
222 | |
223 | /* Reading lines from the server. */ | |
224 | val readthr = thread("admin reader") { | |
225 | println(";; readthr running"); | |
226 | val bin = in match { | |
227 | case br: BufferedReader => br; | |
228 | case _ => new BufferedReader(in) | |
229 | } | |
230 | var line: String = null; | |
231 | ||
232 | try { | |
233 | println(";; wait for line"); | |
234 | while ({line = bin.readLine; line != null}) { | |
235 | println(s";; line: $line"); | |
236 | parseServerLine(line) match { | |
237 | case JobDetached(tag) => synchronized { | |
238 | jobmap(tag) = foregroundJob; releaseForegroundJob(); | |
239 | } | |
240 | case msg: JobMessage => processJobMessage(msg) { keep => | |
241 | val j = foregroundJob; if (!keep) releaseForegroundJob(); j | |
242 | } | |
243 | case BackgroundJobMessage(tag, msg) => | |
244 | processJobMessage(msg) { keep => | |
245 | val j = jobmap.getOrElse(tag, throw new ServerFailed( | |
246 | s"no job with tag `${tag}'")); | |
247 | if (!keep) jobmap.remove(tag); | |
248 | j | |
249 | } | |
250 | case msg: AsyncMessage => | |
0157de02 | 251 | callHook(msg); |
8eabb4ff | 252 | case _: ServiceMessage => |
c8292b34 | 253 | ok; |
8eabb4ff MW |
254 | } |
255 | } | |
256 | } catch { | |
257 | case e: Throwable => e.printStackTrace(); | |
258 | } finally { | |
259 | synchronized { | |
260 | livep = false; | |
261 | for ((_, j) <- jobmap) j.ch.write(JobLostConnection); | |
262 | fgjob match { | |
263 | case Some(j) => | |
264 | j.ch.write(JobLostConnection); | |
265 | fgjob = None; | |
266 | notifyAll(); | |
c8292b34 | 267 | case None => ok; |
8eabb4ff MW |
268 | } |
269 | } | |
0157de02 | 270 | callHook(ConnectionLost); |
8eabb4ff MW |
271 | } |
272 | } | |
273 | } | |
274 | ||
275 | /*----- That's all, folks -------------------------------------------------*/ | |
276 | ||
7894831e | 277 | } |