1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.orekit.gnss.metric.ntrip;
18
19 import java.io.BufferedReader;
20 import java.io.IOException;
21 import java.io.InputStream;
22 import java.io.InputStreamReader;
23 import java.net.Authenticator;
24 import java.net.HttpURLConnection;
25 import java.net.InetAddress;
26 import java.net.InetSocketAddress;
27 import java.net.Proxy;
28 import java.net.Proxy.Type;
29 import java.net.SocketAddress;
30 import java.net.URL;
31 import java.net.URLConnection;
32 import java.net.UnknownHostException;
33 import java.nio.charset.StandardCharsets;
34 import java.util.ArrayList;
35 import java.util.Formatter;
36 import java.util.HashMap;
37 import java.util.List;
38 import java.util.Locale;
39 import java.util.Map;
40 import java.util.concurrent.ExecutorService;
41 import java.util.concurrent.Executors;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.atomic.AtomicReference;
44
45 import org.orekit.errors.OrekitException;
46 import org.orekit.errors.OrekitMessages;
47 import org.orekit.gnss.metric.messages.ParsedMessage;
48
49
50
51
52
53
54
55
56
57
58 public class NtripClient {
59
60
61 public static final int DEFAULT_TIMEOUT = 10000;
62
63
64 public static final int DEFAULT_PORT = 2101;
65
66
67 public static final double DEFAULT_RECONNECT_DELAY = 1.0;
68
69
70 public static final double DEFAULT_RECONNECT_DELAY_FACTOR = 1.5;
71
72
73 public static final int DEFAULT_MAX_RECONNECT = 20;
74
75
76 private static final String HOST_HEADER_KEY = "Host";
77
78
79 private static final String USER_AGENT_HEADER_KEY = "User-Agent";
80
81
82 private static final String USER_AGENT_HEADER_VALUE = "NTRIP orekit/11.0";
83
84
85 private static final String VERSION_HEADER_KEY = "Ntrip-Version";
86
87
88 private static final String VERSION_HEADER_VALUE = "Ntrip/2.0";
89
90
91 private static final String CONNECTION_HEADER_KEY = "Connection";
92
93
94 private static final String CONNECTION_HEADER_VALUE = "close";
95
96
97 private static final String FLAGS_HEADER_KEY = "Ntrip-Flags";
98
99
100 private static final String SOURCETABLE_CONTENT_TYPE = "gnss/sourcetable";
101
102
103 private static final double DEG_TO_MINUTES = 60.0;
104
105
106 private final String host;
107
108
109 private final int port;
110
111
112 private double reconnectDelay;
113
114
115 private double reconnectDelayFactor;
116
117
118 private int maxRetries;
119
120
121 private int timeout;
122
123
124 private Proxy proxy;
125
126
127 private AtomicReference<String> gga;
128
129
130 private final List<ObserverHolder> observers;
131
132
133 private final Map<String, StreamMonitor> monitors;
134
135
136 private SourceTable sourceTable;
137
138
139 private ExecutorService executorService;
140
141
142
143
144
145
146
147
148
149
150 public NtripClient(final String host, final int port) {
151 this.host = host;
152 this.port = port;
153 this.observers = new ArrayList<>();
154 this.monitors = new HashMap<>();
155 setTimeout(DEFAULT_TIMEOUT);
156 setReconnectParameters(DEFAULT_RECONNECT_DELAY,
157 DEFAULT_RECONNECT_DELAY_FACTOR,
158 DEFAULT_MAX_RECONNECT);
159 setProxy(Type.DIRECT, null, -1);
160 this.gga = new AtomicReference<String>(null);
161 this.sourceTable = null;
162 this.executorService = null;
163 }
164
165
166
167
168 public String getHost() {
169 return host;
170 }
171
172
173
174
175 public int getPort() {
176 return port;
177 }
178
179
180
181
182 public void setTimeout(final int timeout) {
183 this.timeout = timeout;
184 }
185
186
187
188
189
190
191 public void setReconnectParameters(final double delay,
192 final double delayFactor,
193 final int max) {
194 this.reconnectDelay = delay;
195 this.reconnectDelayFactor = delayFactor;
196 this.maxRetries = max;
197 }
198
199
200
201
202
203
204 public void setProxy(final Proxy.Type type, final String proxyHost, final int proxyPort) {
205 try {
206 if (type == Proxy.Type.DIRECT) {
207
208 proxy = Proxy.NO_PROXY;
209 } else {
210
211 final InetAddress hostAddress = InetAddress.getByName(proxyHost);
212 final SocketAddress proxyAddress = new InetSocketAddress(hostAddress, proxyPort);
213 proxy = new Proxy(type, proxyAddress);
214 }
215 } catch (UnknownHostException uhe) {
216 throw new OrekitException(uhe, OrekitMessages.UNKNOWN_HOST, proxyHost);
217 }
218 }
219
220
221
222
223 public Proxy getProxy() {
224 return proxy;
225 }
226
227
228
229
230
231
232
233
234
235
236 public void setFix(final int hour, final int minute, final double second,
237 final double latitude, final double longitude, final double ellAltitude,
238 final double undulation) {
239
240
241 final double latDeg = Math.abs(Math.toDegrees(latitude));
242 final int dLat = (int) Math.floor(latDeg);
243 final double mLat = DEG_TO_MINUTES * (latDeg - dLat);
244 final char cLat = latitude >= 0.0 ? 'N' : 'S';
245
246
247 final double lonDeg = Math.abs(Math.toDegrees(longitude));
248 final int dLon = (int) Math.floor(lonDeg);
249 final double mLon = DEG_TO_MINUTES * (lonDeg - dLon);
250 final char cLon = longitude >= 0.0 ? 'E' : 'W';
251
252
253 final StringBuilder builder = new StringBuilder(82);
254 try (Formatter formatter = new Formatter(builder, Locale.US)) {
255
256
257 final int fixQuality = 1;
258 final int nbSat = 4;
259 final double hdop = 1.0;
260
261
262 formatter.format("$GPGGA,%02d%02d%06.3f,%02d%07.4f,%c,%02d%07.4f,%c,%1d,%02d,%3.1f,%.1f,M,%.1f,M,,",
263 hour, minute, second,
264 dLat, mLat, cLat, dLon, mLon, cLon,
265 fixQuality, nbSat, hdop,
266 ellAltitude, undulation);
267
268
269 byte sum = 0;
270 for (int i = 1; i < builder.length(); ++i) {
271 sum ^= builder.charAt(i);
272 }
273 formatter.format("*%02X", sum);
274
275 }
276 gga.set(builder.toString());
277
278 }
279
280
281
282
283 String getGGA() {
284 return gga.get();
285 }
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300 public void addObserver(final int typeCode, final String mountPoint,
301 final MessageObserver observer) {
302
303
304 observers.add(new ObserverHolder(typeCode, mountPoint, observer));
305
306
307 for (Map.Entry<String, StreamMonitor> entry : monitors.entrySet()) {
308 if (mountPoint == null || mountPoint.equals(entry.getKey())) {
309 entry.getValue().addObserver(typeCode, observer);
310 }
311 }
312
313 }
314
315
316
317
318 public SourceTable getSourceTable() {
319 if (sourceTable == null) {
320 try {
321
322
323 final HttpURLConnection connection = connect("");
324
325 final int responseCode = connection.getResponseCode();
326 if (responseCode == HttpURLConnection.HTTP_UNAUTHORIZED) {
327 throw new OrekitException(OrekitMessages.FAILED_AUTHENTICATION, "caster");
328 } else if (responseCode != HttpURLConnection.HTTP_OK) {
329 throw new OrekitException(OrekitMessages.CONNECTION_ERROR, host, connection.getResponseMessage());
330 }
331
332
333 if (!SOURCETABLE_CONTENT_TYPE.equals(connection.getContentType())) {
334 throw new OrekitException(OrekitMessages.UNEXPECTED_CONTENT_TYPE, connection.getContentType());
335 }
336
337 final SourceTable table = new SourceTable(getHeaderValue(connection, FLAGS_HEADER_KEY));
338
339
340 try (InputStream is = connection.getInputStream();
341 InputStreamReader isr = new InputStreamReader(is, StandardCharsets.UTF_8);
342 BufferedReader br = new BufferedReader(isr)) {
343 int lineNumber = 0;
344 for (String line = br.readLine(); line != null; line = br.readLine()) {
345
346 ++lineNumber;
347 line = line.trim();
348 if (line.length() == 0) {
349 continue;
350 }
351
352 if (line.startsWith(RecordType.CAS.toString())) {
353 table.addCasterRecord(new CasterRecord(line));
354 } else if (line.startsWith(RecordType.NET.toString())) {
355 table.addNetworkRecord(new NetworkRecord(line));
356 } else if (line.startsWith(RecordType.STR.toString())) {
357 table.addDataStreamRecord(new DataStreamRecord(line));
358 } else if (line.startsWith("ENDSOURCETABLE")) {
359
360 break;
361 } else {
362 throw new OrekitException(OrekitMessages.SOURCETABLE_PARSE_ERROR,
363 connection.getURL().getHost(), lineNumber, line);
364 }
365
366 }
367 }
368
369 sourceTable = table;
370 return table;
371
372 } catch (IOException ioe) {
373 throw new OrekitException(ioe, OrekitMessages.CANNOT_PARSE_SOURCETABLE, host);
374 }
375 }
376
377 return sourceTable;
378
379 }
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398 public void startStreaming(final String mountPoint, final org.orekit.gnss.metric.ntrip.Type type,
399 final boolean requiresNMEA, final boolean ignoreUnknownMessageTypes) {
400
401 if (executorService == null) {
402
403 executorService = Executors.newFixedThreadPool(getSourceTable().getDataStreams().size());
404 }
405
406
407 if (monitors.containsKey(mountPoint)) {
408 throw new OrekitException(OrekitMessages.MOUNPOINT_ALREADY_CONNECTED, mountPoint);
409 }
410
411
412 final StreamMonitor monitor = new StreamMonitor(this, mountPoint, type, requiresNMEA, ignoreUnknownMessageTypes,
413 reconnectDelay, reconnectDelayFactor, maxRetries);
414 monitors.put(mountPoint, monitor);
415
416
417 for (final ObserverHolder observerHolder : observers) {
418 if (observerHolder.mountPoint == null ||
419 observerHolder.mountPoint.equals(mountPoint)) {
420 monitor.addObserver(observerHolder.typeCode, observerHolder.observer);
421 }
422 }
423
424
425 executorService.execute(monitor);
426
427 }
428
429
430
431
432
433
434 public void checkException() {
435
436 for (final Map.Entry<String, StreamMonitor> entry : monitors.entrySet()) {
437 final OrekitException exception = entry.getValue().getException();
438 if (exception != null) {
439 throw exception;
440 }
441 }
442 }
443
444
445
446
447
448
449
450 public void stopStreaming(final int time) {
451
452
453 for (final Map.Entry<String, StreamMonitor> entry : monitors.entrySet()) {
454 entry.getValue().stopMonitoring();
455 }
456
457 try {
458
459 executorService.awaitTermination(time, TimeUnit.MILLISECONDS);
460 } catch (InterruptedException ie) {
461
462 Thread.currentThread().interrupt();
463 }
464
465 checkException();
466
467 }
468
469
470
471
472
473
474 HttpURLConnection connect(final String mountPoint)
475 throws IOException {
476
477
478 final String protocol = "http";
479 final URL casterURL = new URL(protocol, host, port, "/" + mountPoint);
480 final HttpURLConnection connection = (HttpURLConnection) casterURL.openConnection(proxy);
481 connection.setConnectTimeout(timeout);
482 connection.setReadTimeout(timeout);
483
484
485 connection.setRequestProperty(HOST_HEADER_KEY, host);
486 connection.setRequestProperty(VERSION_HEADER_KEY, VERSION_HEADER_VALUE);
487 connection.setRequestProperty(USER_AGENT_HEADER_KEY, USER_AGENT_HEADER_VALUE);
488 connection.setRequestProperty(CONNECTION_HEADER_KEY, CONNECTION_HEADER_VALUE);
489
490 return connection;
491
492 }
493
494
495
496
497
498
499 private String getHeaderValue(final URLConnection connection, final String key) {
500 final String value = connection.getHeaderField(key);
501 if (value == null) {
502 throw new OrekitException(OrekitMessages.MISSING_HEADER,
503 connection.getURL().getHost(), key);
504 }
505 return value;
506 }
507
508
509 private static class ObserverHolder {
510
511
512 private final int typeCode;
513
514
515 private final String mountPoint;
516
517
518 private final MessageObserver observer;
519
520
521
522
523
524
525
526 ObserverHolder(final int typeCode, final String mountPoint,
527 final MessageObserver observer) {
528 this.typeCode = typeCode;
529 this.mountPoint = mountPoint;
530 this.observer = observer;
531 }
532
533 }
534
535 }