1   /* Copyright 2002-2024 CS GROUP
2    * Licensed to CS GROUP (CS) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * CS licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *   http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.orekit.gnss.metric.ntrip;
18  
19  import java.io.BufferedReader;
20  import java.io.IOException;
21  import java.io.InputStream;
22  import java.io.InputStreamReader;
23  import java.net.Authenticator;
24  import java.net.HttpURLConnection;
25  import java.net.InetAddress;
26  import java.net.InetSocketAddress;
27  import java.net.Proxy;
28  import java.net.Proxy.Type;
29  import java.net.SocketAddress;
30  import java.net.URI;
31  import java.net.URISyntaxException;
32  import java.net.URL;
33  import java.net.URLConnection;
34  import java.net.UnknownHostException;
35  import java.nio.charset.StandardCharsets;
36  import java.util.ArrayList;
37  import java.util.Formatter;
38  import java.util.HashMap;
39  import java.util.List;
40  import java.util.Locale;
41  import java.util.Map;
42  import java.util.concurrent.ExecutorService;
43  import java.util.concurrent.Executors;
44  import java.util.concurrent.TimeUnit;
45  import java.util.concurrent.atomic.AtomicReference;
46  
47  import org.hipparchus.util.FastMath;
48  import org.orekit.errors.OrekitException;
49  import org.orekit.errors.OrekitMessages;
50  import org.orekit.gnss.metric.messages.ParsedMessage;
51  
52  /** Source table for ntrip streams retrieval.
53   * <p>
54   * Note that all authentication is performed automatically by just
55   * calling the standard {@link Authenticator#setDefault(Authenticator)}
56   * method to set up an authenticator.
57   * </p>
58   * @author Luc Maisonobe
59   * @since 11.0
60   */
61  public class NtripClient {
62  
63      /** Default timeout for connections and reads (ms). */
64      public static final int DEFAULT_TIMEOUT = 10000;
65  
66      /** Default port for ntrip communication. */
67      public static final int DEFAULT_PORT = 2101;
68  
69      /** Default delay before we reconnect after connection close (s). */
70      public static final double DEFAULT_RECONNECT_DELAY = 1.0;
71  
72      /** Default factor by which reconnection delay is multiplied after each attempt. */
73      public static final double DEFAULT_RECONNECT_DELAY_FACTOR = 1.5;
74  
75      /** Default maximum number of reconnect a attempts without readin any data. */
76      public static final int DEFAULT_MAX_RECONNECT = 20;
77  
78      /** Host header. */
79      private static final String HOST_HEADER_KEY = "Host";
80  
81      /** User-agent header key. */
82      private static final String USER_AGENT_HEADER_KEY = "User-Agent";
83  
84      /** User-agent header value. */
85      private static final String USER_AGENT_HEADER_VALUE = "NTRIP orekit/11.0";
86  
87      /** Version header key. */
88      private static final String VERSION_HEADER_KEY = "Ntrip-Version";
89  
90      /** Version header value. */
91      private static final String VERSION_HEADER_VALUE = "Ntrip/2.0";
92  
93      /** Connection header key. */
94      private static final String CONNECTION_HEADER_KEY = "Connection";
95  
96      /** Connection header value. */
97      private static final String CONNECTION_HEADER_VALUE = "close";
98  
99      /** Flags header key. */
100     private static final String FLAGS_HEADER_KEY = "Ntrip-Flags";
101 
102     /** Content type for source table. */
103     private static final String SOURCETABLE_CONTENT_TYPE = "gnss/sourcetable";
104 
105     /** Degrees to arc minutes conversion factor. */
106     private static final double DEG_TO_MINUTES = 60.0;
107 
108     /** Caster host. */
109     private final String host;
110 
111     /** Caster port. */
112     private final int port;
113 
114     /** Delay before we reconnect after connection close. */
115     private double reconnectDelay;
116 
117     /** Multiplication factor for reconnection delay. */
118     private double reconnectDelayFactor;
119 
120     /** Max number of reconnections. */
121     private int maxRetries;
122 
123     /** Timeout for connections and reads. */
124     private int timeout;
125 
126     /** Proxy to use. */
127     private Proxy proxy;
128 
129     /** NMEA GGA sentence (may be null). */
130     private AtomicReference<String> gga;
131 
132     /** Observers for encoded messages. */
133     private final List<ObserverHolder> observers;
134 
135     /** Monitors for data streams. */
136     private final Map<String, StreamMonitor> monitors;
137 
138     /** Source table. */
139     private SourceTable sourceTable;
140 
141     /** Executor for stream monitoring tasks. */
142     private ExecutorService executorService;
143 
144     /** Build a client for NTRIP.
145      * <p>
146      * The default configuration uses default timeout, default reconnection
147      * parameters, no GPS fix and no proxy.
148      * </p>
149      * @param host caster host providing the source table
150      * @param port port to use for connection
151      * see {@link #DEFAULT_PORT}
152      */
153     public NtripClient(final String host, final int port) {
154         this.host         = host;
155         this.port         = port;
156         this.observers    = new ArrayList<>();
157         this.monitors     = new HashMap<>();
158         setTimeout(DEFAULT_TIMEOUT);
159         setReconnectParameters(DEFAULT_RECONNECT_DELAY,
160                                DEFAULT_RECONNECT_DELAY_FACTOR,
161                                DEFAULT_MAX_RECONNECT);
162         setProxy(Type.DIRECT, null, -1);
163         this.gga             = new AtomicReference<String>(null);
164         this.sourceTable     = null;
165         this.executorService = null;
166     }
167 
168     /** Get the caster host.
169      * @return caster host
170      */
171     public String getHost() {
172         return host;
173     }
174 
175     /** Get the port to use for connection.
176      * @return port to use for connection
177      */
178     public int getPort() {
179         return port;
180     }
181 
182     /** Set timeout for connections and reads.
183      * @param timeout timeout for connections and reads (ms)
184      */
185     public void setTimeout(final int timeout) {
186         this.timeout = timeout;
187     }
188 
189     /** Set Reconnect parameters.
190      * @param delay delay before we reconnect after connection close
191      * @param delayFactor factor by which reconnection delay is multiplied after each attempt
192      * @param max max number of reconnect a attempts without reading any data
193      */
194     public void setReconnectParameters(final double delay,
195                                        final double delayFactor,
196                                        final int max) {
197         this.reconnectDelay       = delay;
198         this.reconnectDelayFactor = delayFactor;
199         this.maxRetries           = max;
200     }
201 
202     /** Set proxy parameters.
203      * @param type proxy type
204      * @param proxyHost host name of the proxy (ignored if {@code type} is {@code Proxy.Type.DIRECT})
205      * @param proxyPort port number of the proxy (ignored if {@code type} is {@code Proxy.Type.DIRECT})
206      */
207     public void setProxy(final Proxy.Type type, final String proxyHost, final int proxyPort) {
208         try {
209             if (type == Proxy.Type.DIRECT) {
210                 // disable proxy
211                 proxy = Proxy.NO_PROXY;
212             } else {
213                 // enable proxy
214                 final InetAddress   hostAddress  = InetAddress.getByName(proxyHost);
215                 final SocketAddress proxyAddress = new InetSocketAddress(hostAddress, proxyPort);
216                 proxy = new Proxy(type, proxyAddress);
217             }
218         } catch (UnknownHostException uhe) {
219             throw new OrekitException(uhe, OrekitMessages.UNKNOWN_HOST, proxyHost);
220         }
221     }
222 
223     /** Get proxy.
224      * @return proxy to use
225      */
226     public Proxy getProxy() {
227         return proxy;
228     }
229 
230     /** Set GPS fix data to send as NMEA sentence to Ntrip caster if required.
231      * @param hour hour of the fix (UTC time)
232      * @param minute minute of the fix (UTC time)
233      * @param second second of the fix (UTC time)
234      * @param latitude latitude (radians)
235      * @param longitude longitude (radians)
236      * @param ellAltitude altitude above ellipsoid (m)
237      * @param undulation height of the geoid above ellipsoid (m)
238      */
239     public void setFix(final int hour, final int minute, final double second,
240                        final double latitude, final double longitude, final double ellAltitude,
241                        final double undulation) {
242 
243         // convert latitude
244         final double latDeg = FastMath.abs(FastMath.toDegrees(latitude));
245         final int    dLat   = (int) FastMath.floor(latDeg);
246         final double mLat   = DEG_TO_MINUTES * (latDeg - dLat);
247         final char   cLat   = latitude >= 0.0 ? 'N' : 'S';
248 
249         // convert longitude
250         final double lonDeg = FastMath.abs(FastMath.toDegrees(longitude));
251         final int    dLon   = (int) FastMath.floor(lonDeg);
252         final double mLon   = DEG_TO_MINUTES * (lonDeg - dLon);
253         final char   cLon   = longitude >= 0.0 ? 'E' : 'W';
254 
255         // build NMEA GGA sentence
256         final StringBuilder builder = new StringBuilder(82);
257         try (Formatter formatter = new Formatter(builder, Locale.US)) {
258 
259             // dummy values
260             final int    fixQuality = 1;
261             final int    nbSat      = 4;
262             final double hdop       = 1.0;
263 
264             // sentence body
265             formatter.format("$GPGGA,%02d%02d%06.3f,%02d%07.4f,%c,%02d%07.4f,%c,%1d,%02d,%3.1f,%.1f,M,%.1f,M,,",
266                              hour, minute, second,
267                              dLat, mLat, cLat, dLon, mLon, cLon,
268                              fixQuality, nbSat, hdop,
269                              ellAltitude, undulation);
270 
271             // checksum
272             byte sum = 0;
273             for (int i = 1; i < builder.length(); ++i) {
274                 sum ^= builder.charAt(i);
275             }
276             formatter.format("*%02X", sum);
277 
278         }
279         gga.set(builder.toString());
280 
281     }
282 
283     /** Get NMEA GGA sentence.
284      * @return NMEA GGA sentence (may be null)
285      */
286     String getGGA() {
287         return gga.get();
288     }
289 
290     /** Add an observer for an encoded messages.
291      * <p>
292      * If messages of the specified type have already been retrieved from
293      * a stream, the observer will be immediately notified with the last
294      * message from each mount point (in unspecified order) as a side effect
295      * of being added.
296      * </p>
297      * @param typeCode code for the message type (if set to 0, notification
298      * will be triggered regardless of message type)
299      * @param mountPoint mountPoint from which data must come (if null, notification
300      * will be triggered regardless of mount point)
301      * @param observer observer for this message type
302      */
303     public void addObserver(final int typeCode, final String mountPoint,
304                             final MessageObserver observer) {
305 
306         // store the observer for future monitored mount points
307         observers.add(new ObserverHolder(typeCode, mountPoint, observer));
308 
309         // check if we should also add it to already monitored mount points
310         for (Map.Entry<String, StreamMonitor> entry : monitors.entrySet()) {
311             if (mountPoint == null || mountPoint.equals(entry.getKey())) {
312                 entry.getValue().addObserver(typeCode, observer);
313             }
314         }
315 
316     }
317 
318     /** Get a sourcetable.
319      * @return source table from the caster
320      */
321     public SourceTable getSourceTable() {
322         if (sourceTable == null) {
323             try {
324 
325                 // perform request
326                 final HttpURLConnection connection = connect("");
327 
328                 final int responseCode = connection.getResponseCode();
329                 if (responseCode == HttpURLConnection.HTTP_UNAUTHORIZED) {
330                     throw new OrekitException(OrekitMessages.FAILED_AUTHENTICATION, "caster");
331                 } else if (responseCode != HttpURLConnection.HTTP_OK) {
332                     throw new OrekitException(OrekitMessages.CONNECTION_ERROR, host, connection.getResponseMessage());
333                 }
334 
335                 // for this request, we MUST get a source table
336                 if (!SOURCETABLE_CONTENT_TYPE.equals(connection.getContentType())) {
337                     throw new OrekitException(OrekitMessages.UNEXPECTED_CONTENT_TYPE, connection.getContentType());
338                 }
339 
340                 final SourceTable table = new SourceTable(getHeaderValue(connection, FLAGS_HEADER_KEY));
341 
342                 // parse source table records
343                 try (InputStream is = connection.getInputStream();
344                      InputStreamReader isr = new InputStreamReader(is, StandardCharsets.UTF_8);
345                      BufferedReader br = new BufferedReader(isr)) {
346                     int lineNumber = 0;
347                     for (String line = br.readLine(); line != null; line = br.readLine()) {
348 
349                         ++lineNumber;
350                         line = line.trim();
351                         if (line.length() == 0) {
352                             continue;
353                         }
354 
355                         if (line.startsWith(RecordType.CAS.toString())) {
356                             table.addCasterRecord(new CasterRecord(line));
357                         } else if (line.startsWith(RecordType.NET.toString())) {
358                             table.addNetworkRecord(new NetworkRecord(line));
359                         } else if (line.startsWith(RecordType.STR.toString())) {
360                             table.addDataStreamRecord(new DataStreamRecord(line));
361                         } else if (line.startsWith("ENDSOURCETABLE")) {
362                             // we have reached end of table
363                             break;
364                         } else {
365                             throw new OrekitException(OrekitMessages.SOURCETABLE_PARSE_ERROR,
366                                                       connection.getURL().getHost(), lineNumber, line);
367                         }
368 
369                     }
370                 }
371 
372                 sourceTable = table;
373                 return table;
374 
375             } catch (IOException | URISyntaxException e) {
376                 throw new OrekitException(e, OrekitMessages.CANNOT_PARSE_SOURCETABLE, host);
377             }
378         }
379 
380         return sourceTable;
381 
382     }
383 
384     /** Connect to a mount point and start streaming data from it.
385      * <p>
386      * This method sets up an internal dedicated thread for continuously
387      * monitoring data incoming from a mount point. When new complete
388      * {@link ParsedMessage parsed messages} becomes available, the
389      * {@link MessageObserver observers} that have been registered
390      * using {@link #addObserver(int, String, MessageObserver) addObserver()}
391      * method will be notified about the message.
392      * </p>
393      * <p>
394      * This method must be called once for each stream to monitor.
395      * </p>
396      * @param mountPoint mount point providing the stream
397      * @param type messages type of the mount point
398      * @param requiresNMEA if true, the mount point requires a NMEA GGA sentence in the request
399      * @param ignoreUnknownMessageTypes if true, unknown messages types are silently ignored
400      */
401     public void startStreaming(final String mountPoint, final org.orekit.gnss.metric.ntrip.Type type,
402                                final boolean requiresNMEA, final boolean ignoreUnknownMessageTypes) {
403 
404         if (executorService == null) {
405             // lazy creation of executor service, with one thread for each possible data stream
406             executorService = Executors.newFixedThreadPool(getSourceTable().getDataStreams().size());
407         }
408 
409         // safety check
410         if (monitors.containsKey(mountPoint)) {
411             throw new OrekitException(OrekitMessages.MOUNPOINT_ALREADY_CONNECTED, mountPoint);
412         }
413 
414         // create the monitor
415         final StreamMonitor monitor = new StreamMonitor(this, mountPoint, type, requiresNMEA, ignoreUnknownMessageTypes,
416                                                         reconnectDelay, reconnectDelayFactor, maxRetries);
417         monitors.put(mountPoint, monitor);
418 
419         // set up the already known observers
420         for (final ObserverHolder observerHolder : observers) {
421             if (observerHolder.mountPoint == null ||
422                 observerHolder.mountPoint.equals(mountPoint)) {
423                 monitor.addObserver(observerHolder.typeCode, observerHolder.observer);
424             }
425         }
426 
427         // start streaming data
428         executorService.execute(monitor);
429 
430     }
431 
432     /** Check if any of the streaming thread has thrown an exception.
433      * <p>
434      * If a streaming thread has thrown an exception, it will be rethrown here
435      * </p>
436      */
437     public void checkException() {
438         // check if any of the stream got an exception
439         for (final  Map.Entry<String, StreamMonitor> entry : monitors.entrySet()) {
440             final OrekitException exception = entry.getValue().getException();
441             if (exception != null) {
442                 throw exception;
443             }
444         }
445     }
446 
447     /** Stop streaming data from all connected mount points.
448      * <p>
449      * If an exception was encountered during data streaming, it will be rethrown here
450      * </p>
451      * @param time timeout for waiting underlying threads termination (ms)
452      */
453     public void stopStreaming(final int time) {
454 
455         // ask all monitors to stop retrieving data
456         for (final  Map.Entry<String, StreamMonitor> entry : monitors.entrySet()) {
457             entry.getValue().stopMonitoring();
458         }
459 
460         try {
461             // wait for proper ending
462             executorService.shutdown();
463             executorService.awaitTermination(time, TimeUnit.MILLISECONDS);
464         } catch (InterruptedException ie) {
465             // Restore interrupted state...
466             Thread.currentThread().interrupt();
467         }
468 
469         checkException();
470 
471     }
472 
473     /** Connect to caster.
474      * @param mountPoint mount point (empty for getting sourcetable)
475      * @return performed connection
476      * @throws IOException if an I/O exception occurs during connection
477      * @throws URISyntaxException if the built URI is invalid
478      */
479     HttpURLConnection connect(final String mountPoint)
480         throws IOException, URISyntaxException {
481 
482         // set up connection
483         final String scheme = "http";
484         final URL casterURL = new URI(scheme, null, host, port, "/" + mountPoint, null, null).toURL();
485         final HttpURLConnection connection = (HttpURLConnection) casterURL.openConnection(proxy);
486         connection.setConnectTimeout(timeout);
487         connection.setReadTimeout(timeout);
488 
489         // common headers
490         connection.setRequestProperty(HOST_HEADER_KEY,       host);
491         connection.setRequestProperty(VERSION_HEADER_KEY,    VERSION_HEADER_VALUE);
492         connection.setRequestProperty(USER_AGENT_HEADER_KEY, USER_AGENT_HEADER_VALUE);
493         connection.setRequestProperty(CONNECTION_HEADER_KEY, CONNECTION_HEADER_VALUE);
494 
495         return connection;
496 
497     }
498 
499     /** Get an header from a response.
500      * @param connection connection to analyze
501      * @param key header key
502      * @return header value
503      */
504     private String getHeaderValue(final URLConnection connection, final String key) {
505         final String value = connection.getHeaderField(key);
506         if (value == null) {
507             throw new OrekitException(OrekitMessages.MISSING_HEADER,
508                                       connection.getURL().getHost(), key);
509         }
510         return value;
511     }
512 
513     /** Local holder for observers. */
514     private static class ObserverHolder {
515 
516         /** Code for the message type. */
517         private final int typeCode;
518 
519         /** Mount point. */
520         private final String mountPoint;
521 
522         /** Observer to notify. */
523         private final MessageObserver observer;
524 
525         /** Simple constructor.
526          * @param typeCode code for the message type
527          * @param mountPoint mountPoint from which data must come (if null, notification
528          * will be triggered regardless of mount point)
529          * @param observer observer for this message type
530          */
531         ObserverHolder(final int typeCode, final String mountPoint,
532                             final MessageObserver observer) {
533             this.typeCode   = typeCode;
534             this.mountPoint = mountPoint;
535             this.observer   = observer;
536         }
537 
538     }
539 
540 }