1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.orekit.gnss.metric.ntrip;
18
19 import java.io.BufferedReader;
20 import java.io.IOException;
21 import java.io.InputStream;
22 import java.io.InputStreamReader;
23 import java.net.Authenticator;
24 import java.net.HttpURLConnection;
25 import java.net.InetAddress;
26 import java.net.InetSocketAddress;
27 import java.net.Proxy;
28 import java.net.Proxy.Type;
29 import java.net.SocketAddress;
30 import java.net.URI;
31 import java.net.URISyntaxException;
32 import java.net.URL;
33 import java.net.URLConnection;
34 import java.net.UnknownHostException;
35 import java.nio.charset.StandardCharsets;
36 import java.util.ArrayList;
37 import java.util.Formatter;
38 import java.util.HashMap;
39 import java.util.List;
40 import java.util.Locale;
41 import java.util.Map;
42 import java.util.concurrent.ExecutorService;
43 import java.util.concurrent.Executors;
44 import java.util.concurrent.TimeUnit;
45 import java.util.concurrent.atomic.AtomicReference;
46
47 import org.hipparchus.util.FastMath;
48 import org.orekit.errors.OrekitException;
49 import org.orekit.errors.OrekitMessages;
50 import org.orekit.gnss.metric.messages.ParsedMessage;
51
52
53
54
55
56
57
58
59
60
61 public class NtripClient {
62
63
64 public static final int DEFAULT_TIMEOUT = 10000;
65
66
67 public static final int DEFAULT_PORT = 2101;
68
69
70 public static final double DEFAULT_RECONNECT_DELAY = 1.0;
71
72
73 public static final double DEFAULT_RECONNECT_DELAY_FACTOR = 1.5;
74
75
76 public static final int DEFAULT_MAX_RECONNECT = 20;
77
78
79 private static final String HOST_HEADER_KEY = "Host";
80
81
82 private static final String USER_AGENT_HEADER_KEY = "User-Agent";
83
84
85 private static final String USER_AGENT_HEADER_VALUE = "NTRIP orekit/11.0";
86
87
88 private static final String VERSION_HEADER_KEY = "Ntrip-Version";
89
90
91 private static final String VERSION_HEADER_VALUE = "Ntrip/2.0";
92
93
94 private static final String CONNECTION_HEADER_KEY = "Connection";
95
96
97 private static final String CONNECTION_HEADER_VALUE = "close";
98
99
100 private static final String FLAGS_HEADER_KEY = "Ntrip-Flags";
101
102
103 private static final String SOURCETABLE_CONTENT_TYPE = "gnss/sourcetable";
104
105
106 private static final double DEG_TO_MINUTES = 60.0;
107
108
109 private final String host;
110
111
112 private final int port;
113
114
115 private double reconnectDelay;
116
117
118 private double reconnectDelayFactor;
119
120
121 private int maxRetries;
122
123
124 private int timeout;
125
126
127 private Proxy proxy;
128
129
130 private AtomicReference<String> gga;
131
132
133 private final List<ObserverHolder> observers;
134
135
136 private final Map<String, StreamMonitor> monitors;
137
138
139 private SourceTable sourceTable;
140
141
142 private ExecutorService executorService;
143
144
145
146
147
148
149
150
151
152
153 public NtripClient(final String host, final int port) {
154 this.host = host;
155 this.port = port;
156 this.observers = new ArrayList<>();
157 this.monitors = new HashMap<>();
158 setTimeout(DEFAULT_TIMEOUT);
159 setReconnectParameters(DEFAULT_RECONNECT_DELAY,
160 DEFAULT_RECONNECT_DELAY_FACTOR,
161 DEFAULT_MAX_RECONNECT);
162 setProxy(Type.DIRECT, null, -1);
163 this.gga = new AtomicReference<String>(null);
164 this.sourceTable = null;
165 this.executorService = null;
166 }
167
168
169
170
171 public String getHost() {
172 return host;
173 }
174
175
176
177
178 public int getPort() {
179 return port;
180 }
181
182
183
184
185 public void setTimeout(final int timeout) {
186 this.timeout = timeout;
187 }
188
189
190
191
192
193
194 public void setReconnectParameters(final double delay,
195 final double delayFactor,
196 final int max) {
197 this.reconnectDelay = delay;
198 this.reconnectDelayFactor = delayFactor;
199 this.maxRetries = max;
200 }
201
202
203
204
205
206
207 public void setProxy(final Proxy.Type type, final String proxyHost, final int proxyPort) {
208 try {
209 if (type == Proxy.Type.DIRECT) {
210
211 proxy = Proxy.NO_PROXY;
212 } else {
213
214 final InetAddress hostAddress = InetAddress.getByName(proxyHost);
215 final SocketAddress proxyAddress = new InetSocketAddress(hostAddress, proxyPort);
216 proxy = new Proxy(type, proxyAddress);
217 }
218 } catch (UnknownHostException uhe) {
219 throw new OrekitException(uhe, OrekitMessages.UNKNOWN_HOST, proxyHost);
220 }
221 }
222
223
224
225
226 public Proxy getProxy() {
227 return proxy;
228 }
229
230
231
232
233
234
235
236
237
238
239 public void setFix(final int hour, final int minute, final double second,
240 final double latitude, final double longitude, final double ellAltitude,
241 final double undulation) {
242
243
244 final double latDeg = FastMath.abs(FastMath.toDegrees(latitude));
245 final int dLat = (int) FastMath.floor(latDeg);
246 final double mLat = DEG_TO_MINUTES * (latDeg - dLat);
247 final char cLat = latitude >= 0.0 ? 'N' : 'S';
248
249
250 final double lonDeg = FastMath.abs(FastMath.toDegrees(longitude));
251 final int dLon = (int) FastMath.floor(lonDeg);
252 final double mLon = DEG_TO_MINUTES * (lonDeg - dLon);
253 final char cLon = longitude >= 0.0 ? 'E' : 'W';
254
255
256 final StringBuilder builder = new StringBuilder(82);
257 try (Formatter formatter = new Formatter(builder, Locale.US)) {
258
259
260 final int fixQuality = 1;
261 final int nbSat = 4;
262 final double hdop = 1.0;
263
264
265 formatter.format("$GPGGA,%02d%02d%06.3f,%02d%07.4f,%c,%02d%07.4f,%c,%1d,%02d,%3.1f,%.1f,M,%.1f,M,,",
266 hour, minute, second,
267 dLat, mLat, cLat, dLon, mLon, cLon,
268 fixQuality, nbSat, hdop,
269 ellAltitude, undulation);
270
271
272 byte sum = 0;
273 for (int i = 1; i < builder.length(); ++i) {
274 sum ^= builder.charAt(i);
275 }
276 formatter.format("*%02X", sum);
277
278 }
279 gga.set(builder.toString());
280
281 }
282
283
284
285
286 String getGGA() {
287 return gga.get();
288 }
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303 public void addObserver(final int typeCode, final String mountPoint,
304 final MessageObserver observer) {
305
306
307 observers.add(new ObserverHolder(typeCode, mountPoint, observer));
308
309
310 for (Map.Entry<String, StreamMonitor> entry : monitors.entrySet()) {
311 if (mountPoint == null || mountPoint.equals(entry.getKey())) {
312 entry.getValue().addObserver(typeCode, observer);
313 }
314 }
315
316 }
317
318
319
320
321 public SourceTable getSourceTable() {
322 if (sourceTable == null) {
323 try {
324
325
326 final HttpURLConnection connection = connect("");
327
328 final int responseCode = connection.getResponseCode();
329 if (responseCode == HttpURLConnection.HTTP_UNAUTHORIZED) {
330 throw new OrekitException(OrekitMessages.FAILED_AUTHENTICATION, "caster");
331 } else if (responseCode != HttpURLConnection.HTTP_OK) {
332 throw new OrekitException(OrekitMessages.CONNECTION_ERROR, host, connection.getResponseMessage());
333 }
334
335
336 if (!SOURCETABLE_CONTENT_TYPE.equals(connection.getContentType())) {
337 throw new OrekitException(OrekitMessages.UNEXPECTED_CONTENT_TYPE, connection.getContentType());
338 }
339
340 final SourceTable table = new SourceTable(getHeaderValue(connection, FLAGS_HEADER_KEY));
341
342
343 try (InputStream is = connection.getInputStream();
344 InputStreamReader isr = new InputStreamReader(is, StandardCharsets.UTF_8);
345 BufferedReader br = new BufferedReader(isr)) {
346 int lineNumber = 0;
347 for (String line = br.readLine(); line != null; line = br.readLine()) {
348
349 ++lineNumber;
350 line = line.trim();
351 if (line.length() == 0) {
352 continue;
353 }
354
355 if (line.startsWith(RecordType.CAS.toString())) {
356 table.addCasterRecord(new CasterRecord(line));
357 } else if (line.startsWith(RecordType.NET.toString())) {
358 table.addNetworkRecord(new NetworkRecord(line));
359 } else if (line.startsWith(RecordType.STR.toString())) {
360 table.addDataStreamRecord(new DataStreamRecord(line));
361 } else if (line.startsWith("ENDSOURCETABLE")) {
362
363 break;
364 } else {
365 throw new OrekitException(OrekitMessages.SOURCETABLE_PARSE_ERROR,
366 connection.getURL().getHost(), lineNumber, line);
367 }
368
369 }
370 }
371
372 sourceTable = table;
373 return table;
374
375 } catch (IOException | URISyntaxException e) {
376 throw new OrekitException(e, OrekitMessages.CANNOT_PARSE_SOURCETABLE, host);
377 }
378 }
379
380 return sourceTable;
381
382 }
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401 public void startStreaming(final String mountPoint, final org.orekit.gnss.metric.ntrip.Type type,
402 final boolean requiresNMEA, final boolean ignoreUnknownMessageTypes) {
403
404 if (executorService == null) {
405
406 executorService = Executors.newFixedThreadPool(getSourceTable().getDataStreams().size());
407 }
408
409
410 if (monitors.containsKey(mountPoint)) {
411 throw new OrekitException(OrekitMessages.MOUNPOINT_ALREADY_CONNECTED, mountPoint);
412 }
413
414
415 final StreamMonitor monitor = new StreamMonitor(this, mountPoint, type, requiresNMEA, ignoreUnknownMessageTypes,
416 reconnectDelay, reconnectDelayFactor, maxRetries);
417 monitors.put(mountPoint, monitor);
418
419
420 for (final ObserverHolder observerHolder : observers) {
421 if (observerHolder.mountPoint == null ||
422 observerHolder.mountPoint.equals(mountPoint)) {
423 monitor.addObserver(observerHolder.typeCode, observerHolder.observer);
424 }
425 }
426
427
428 executorService.execute(monitor);
429
430 }
431
432
433
434
435
436
437 public void checkException() {
438
439 for (final Map.Entry<String, StreamMonitor> entry : monitors.entrySet()) {
440 final OrekitException exception = entry.getValue().getException();
441 if (exception != null) {
442 throw exception;
443 }
444 }
445 }
446
447
448
449
450
451
452
453 public void stopStreaming(final int time) {
454
455
456 for (final Map.Entry<String, StreamMonitor> entry : monitors.entrySet()) {
457 entry.getValue().stopMonitoring();
458 }
459
460 try {
461
462 executorService.shutdown();
463 executorService.awaitTermination(time, TimeUnit.MILLISECONDS);
464 } catch (InterruptedException ie) {
465
466 Thread.currentThread().interrupt();
467 }
468
469 checkException();
470
471 }
472
473
474
475
476
477
478
479 HttpURLConnection connect(final String mountPoint)
480 throws IOException, URISyntaxException {
481
482
483 final String scheme = "http";
484 final URL casterURL = new URI(scheme, null, host, port, "/" + mountPoint, null, null).toURL();
485 final HttpURLConnection connection = (HttpURLConnection) casterURL.openConnection(proxy);
486 connection.setConnectTimeout(timeout);
487 connection.setReadTimeout(timeout);
488
489
490 connection.setRequestProperty(HOST_HEADER_KEY, host);
491 connection.setRequestProperty(VERSION_HEADER_KEY, VERSION_HEADER_VALUE);
492 connection.setRequestProperty(USER_AGENT_HEADER_KEY, USER_AGENT_HEADER_VALUE);
493 connection.setRequestProperty(CONNECTION_HEADER_KEY, CONNECTION_HEADER_VALUE);
494
495 return connection;
496
497 }
498
499
500
501
502
503
504 private String getHeaderValue(final URLConnection connection, final String key) {
505 final String value = connection.getHeaderField(key);
506 if (value == null) {
507 throw new OrekitException(OrekitMessages.MISSING_HEADER,
508 connection.getURL().getHost(), key);
509 }
510 return value;
511 }
512
513
514 private static class ObserverHolder {
515
516
517 private final int typeCode;
518
519
520 private final String mountPoint;
521
522
523 private final MessageObserver observer;
524
525
526
527
528
529
530
531 ObserverHolder(final int typeCode, final String mountPoint,
532 final MessageObserver observer) {
533 this.typeCode = typeCode;
534 this.mountPoint = mountPoint;
535 this.observer = observer;
536 }
537
538 }
539
540 }