NtripClient.java

  1. /* Copyright 2002-2025 CS GROUP
  2.  * Licensed to CS GROUP (CS) under one or more
  3.  * contributor license agreements.  See the NOTICE file distributed with
  4.  * this work for additional information regarding copyright ownership.
  5.  * CS licenses this file to You under the Apache License, Version 2.0
  6.  * (the "License"); you may not use this file except in compliance with
  7.  * the License.  You may obtain a copy of the License at
  8.  *
  9.  *   http://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  * Unless required by applicable law or agreed to in writing, software
  12.  * distributed under the License is distributed on an "AS IS" BASIS,
  13.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  * See the License for the specific language governing permissions and
  15.  * limitations under the License.
  16.  */
  17. package org.orekit.gnss.metric.ntrip;

  18. import java.io.BufferedReader;
  19. import java.io.IOException;
  20. import java.io.InputStream;
  21. import java.io.InputStreamReader;
  22. import java.net.Authenticator;
  23. import java.net.HttpURLConnection;
  24. import java.net.InetAddress;
  25. import java.net.InetSocketAddress;
  26. import java.net.Proxy;
  27. import java.net.Proxy.Type;
  28. import java.net.SocketAddress;
  29. import java.net.URI;
  30. import java.net.URISyntaxException;
  31. import java.net.URL;
  32. import java.net.URLConnection;
  33. import java.net.UnknownHostException;
  34. import java.nio.charset.StandardCharsets;
  35. import java.util.ArrayList;
  36. import java.util.Formatter;
  37. import java.util.HashMap;
  38. import java.util.List;
  39. import java.util.Locale;
  40. import java.util.Map;
  41. import java.util.concurrent.ExecutorService;
  42. import java.util.concurrent.Executors;
  43. import java.util.concurrent.TimeUnit;
  44. import java.util.concurrent.atomic.AtomicReference;

  45. import org.hipparchus.util.FastMath;
  46. import org.orekit.errors.OrekitException;
  47. import org.orekit.errors.OrekitMessages;
  48. import org.orekit.gnss.metric.messages.ParsedMessage;
  49. import org.orekit.time.TimeScales;

  50. /** Source table for ntrip streams retrieval.
  51.  * <p>
  52.  * Note that all authentication is performed automatically by just
  53.  * calling the standard {@link Authenticator#setDefault(Authenticator)}
  54.  * method to set up an authenticator.
  55.  * </p>
  56.  * @author Luc Maisonobe
  57.  * @since 11.0
  58.  */
  59. public class NtripClient {

  60.     /** Default timeout for connections and reads (ms). */
  61.     public static final int DEFAULT_TIMEOUT = 10000;

  62.     /** Default port for ntrip communication. */
  63.     public static final int DEFAULT_PORT = 2101;

  64.     /** Default delay before we reconnect after connection close (s). */
  65.     public static final double DEFAULT_RECONNECT_DELAY = 1.0;

  66.     /** Default factor by which reconnection delay is multiplied after each attempt. */
  67.     public static final double DEFAULT_RECONNECT_DELAY_FACTOR = 1.5;

  68.     /** Default maximum number of reconnect a attempts without readin any data. */
  69.     public static final int DEFAULT_MAX_RECONNECT = 20;

  70.     /** Host header. */
  71.     private static final String HOST_HEADER_KEY = "Host";

  72.     /** User-agent header key. */
  73.     private static final String USER_AGENT_HEADER_KEY = "User-Agent";

  74.     /** User-agent header value. */
  75.     private static final String USER_AGENT_HEADER_VALUE = "NTRIP orekit/11.0";

  76.     /** Version header key. */
  77.     private static final String VERSION_HEADER_KEY = "Ntrip-Version";

  78.     /** Version header value. */
  79.     private static final String VERSION_HEADER_VALUE = "Ntrip/2.0";

  80.     /** Connection header key. */
  81.     private static final String CONNECTION_HEADER_KEY = "Connection";

  82.     /** Connection header value. */
  83.     private static final String CONNECTION_HEADER_VALUE = "close";

  84.     /** Flags header key. */
  85.     private static final String FLAGS_HEADER_KEY = "Ntrip-Flags";

  86.     /** Content type for source table. */
  87.     private static final String SOURCETABLE_CONTENT_TYPE = "gnss/sourcetable";

  88.     /** Degrees to arc minutes conversion factor. */
  89.     private static final double DEG_TO_MINUTES = 60.0;

  90.     /** Caster host. */
  91.     private final String host;

  92.     /** Caster port. */
  93.     private final int port;

  94.     /** Delay before we reconnect after connection close. */
  95.     private double reconnectDelay;

  96.     /** Multiplication factor for reconnection delay. */
  97.     private double reconnectDelayFactor;

  98.     /** Max number of reconnections. */
  99.     private int maxRetries;

  100.     /** Timeout for connections and reads. */
  101.     private int timeout;

  102.     /** Proxy to use. */
  103.     private Proxy proxy;

  104.     /** NMEA GGA sentence (may be null). */
  105.     private AtomicReference<String> gga;

  106.     /** Observers for encoded messages. */
  107.     private final List<ObserverHolder> observers;

  108.     /** Monitors for data streams. */
  109.     private final Map<String, StreamMonitor> monitors;

  110.     /** Source table. */
  111.     private SourceTable sourceTable;

  112.     /** Executor for stream monitoring tasks. */
  113.     private ExecutorService executorService;

  114.     /** Known time scales.
  115.      * @since 13.0
  116.      */
  117.     private final TimeScales timeScales;

  118.     /** Build a client for NTRIP.
  119.      * <p>
  120.      * The default configuration uses default timeout, default reconnection
  121.      * parameters, no GPS fix and no proxy.
  122.      * </p>
  123.      * @param host caster host providing the source table
  124.      * @param port port to use for connection
  125.      * @param timeScales known time scales
  126.      * @since 13.0
  127.      * see {@link #DEFAULT_PORT}
  128.      */
  129.     public NtripClient(final String host, final int port, final TimeScales timeScales) {
  130.         this.host         = host;
  131.         this.port         = port;
  132.         this.observers    = new ArrayList<>();
  133.         this.monitors     = new HashMap<>();
  134.         setTimeout(DEFAULT_TIMEOUT);
  135.         setReconnectParameters(DEFAULT_RECONNECT_DELAY,
  136.                                DEFAULT_RECONNECT_DELAY_FACTOR,
  137.                                DEFAULT_MAX_RECONNECT);
  138.         setProxy(Type.DIRECT, null, -1);
  139.         this.gga             = new AtomicReference<>(null);
  140.         this.sourceTable     = null;
  141.         this.executorService = null;
  142.         this.timeScales      = timeScales;
  143.     }

  144.     /** Get the caster host.
  145.      * @return caster host
  146.      */
  147.     public String getHost() {
  148.         return host;
  149.     }

  150.     /** Get the port to use for connection.
  151.      * @return port to use for connection
  152.      */
  153.     public int getPort() {
  154.         return port;
  155.     }

  156.     /** Get the known time scales.
  157.      * @return known time scales
  158.      * @since 13.0
  159.      */
  160.     public TimeScales getTimeScales() {
  161.         return timeScales;
  162.     }

  163.     /** Set timeout for connections and reads.
  164.      * @param timeout timeout for connections and reads (ms)
  165.      */
  166.     public void setTimeout(final int timeout) {
  167.         this.timeout = timeout;
  168.     }

  169.     /** Set Reconnect parameters.
  170.      * @param delay delay before we reconnect after connection close
  171.      * @param delayFactor factor by which reconnection delay is multiplied after each attempt
  172.      * @param max max number of reconnect a attempts without reading any data
  173.      */
  174.     public void setReconnectParameters(final double delay,
  175.                                        final double delayFactor,
  176.                                        final int max) {
  177.         this.reconnectDelay       = delay;
  178.         this.reconnectDelayFactor = delayFactor;
  179.         this.maxRetries           = max;
  180.     }

  181.     /** Set proxy parameters.
  182.      * @param type proxy type
  183.      * @param proxyHost host name of the proxy (ignored if {@code type} is {@code Proxy.Type.DIRECT})
  184.      * @param proxyPort port number of the proxy (ignored if {@code type} is {@code Proxy.Type.DIRECT})
  185.      */
  186.     public void setProxy(final Proxy.Type type, final String proxyHost, final int proxyPort) {
  187.         try {
  188.             if (type == Proxy.Type.DIRECT) {
  189.                 // disable proxy
  190.                 proxy = Proxy.NO_PROXY;
  191.             } else {
  192.                 // enable proxy
  193.                 final InetAddress   hostAddress  = InetAddress.getByName(proxyHost);
  194.                 final SocketAddress proxyAddress = new InetSocketAddress(hostAddress, proxyPort);
  195.                 proxy = new Proxy(type, proxyAddress);
  196.             }
  197.         } catch (UnknownHostException uhe) {
  198.             throw new OrekitException(uhe, OrekitMessages.UNKNOWN_HOST, proxyHost);
  199.         }
  200.     }

  201.     /** Get proxy.
  202.      * @return proxy to use
  203.      */
  204.     public Proxy getProxy() {
  205.         return proxy;
  206.     }

  207.     /** Set GPS fix data to send as NMEA sentence to Ntrip caster if required.
  208.      * @param hour hour of the fix (UTC time)
  209.      * @param minute minute of the fix (UTC time)
  210.      * @param second second of the fix (UTC time)
  211.      * @param latitude latitude (radians)
  212.      * @param longitude longitude (radians)
  213.      * @param ellAltitude altitude above ellipsoid (m)
  214.      * @param undulation height of the geoid above ellipsoid (m)
  215.      */
  216.     public void setFix(final int hour, final int minute, final double second,
  217.                        final double latitude, final double longitude, final double ellAltitude,
  218.                        final double undulation) {

  219.         // convert latitude
  220.         final double latDeg = FastMath.abs(FastMath.toDegrees(latitude));
  221.         final int    dLat   = (int) FastMath.floor(latDeg);
  222.         final double mLat   = DEG_TO_MINUTES * (latDeg - dLat);
  223.         final char   cLat   = latitude >= 0.0 ? 'N' : 'S';

  224.         // convert longitude
  225.         final double lonDeg = FastMath.abs(FastMath.toDegrees(longitude));
  226.         final int    dLon   = (int) FastMath.floor(lonDeg);
  227.         final double mLon   = DEG_TO_MINUTES * (lonDeg - dLon);
  228.         final char   cLon   = longitude >= 0.0 ? 'E' : 'W';

  229.         // build NMEA GGA sentence
  230.         final StringBuilder builder = new StringBuilder(82);
  231.         try (Formatter formatter = new Formatter(builder, Locale.US)) {

  232.             // dummy values
  233.             final int    fixQuality = 1;
  234.             final int    nbSat      = 4;
  235.             final double hdop       = 1.0;

  236.             // sentence body
  237.             formatter.format("$GPGGA,%02d%02d%06.3f,%02d%07.4f,%c,%02d%07.4f,%c,%1d,%02d,%3.1f,%.1f,M,%.1f,M,,",
  238.                              hour, minute, second,
  239.                              dLat, mLat, cLat, dLon, mLon, cLon,
  240.                              fixQuality, nbSat, hdop,
  241.                              ellAltitude, undulation);

  242.             // checksum
  243.             byte sum = 0;
  244.             for (int i = 1; i < builder.length(); ++i) {
  245.                 sum ^= builder.charAt(i);
  246.             }
  247.             formatter.format("*%02X", sum);

  248.         }
  249.         gga.set(builder.toString());

  250.     }

  251.     /** Get NMEA GGA sentence.
  252.      * @return NMEA GGA sentence (may be null)
  253.      */
  254.     String getGGA() {
  255.         return gga.get();
  256.     }

  257.     /** Add an observer for an encoded messages.
  258.      * <p>
  259.      * If messages of the specified type have already been retrieved from
  260.      * a stream, the observer will be immediately notified with the last
  261.      * message from each mount point (in unspecified order) as a side effect
  262.      * of being added.
  263.      * </p>
  264.      * @param typeCode code for the message type (if set to 0, notification
  265.      * will be triggered regardless of message type)
  266.      * @param mountPoint mountPoint from which data must come (if null, notification
  267.      * will be triggered regardless of mount point)
  268.      * @param observer observer for this message type
  269.      */
  270.     public void addObserver(final int typeCode, final String mountPoint,
  271.                             final MessageObserver observer) {

  272.         // store the observer for future monitored mount points
  273.         observers.add(new ObserverHolder(typeCode, mountPoint, observer));

  274.         // check if we should also add it to already monitored mount points
  275.         for (Map.Entry<String, StreamMonitor> entry : monitors.entrySet()) {
  276.             if (mountPoint == null || mountPoint.equals(entry.getKey())) {
  277.                 entry.getValue().addObserver(typeCode, observer);
  278.             }
  279.         }

  280.     }

  281.     /** Get a sourcetable.
  282.      * @return source table from the caster
  283.      */
  284.     public SourceTable getSourceTable() {
  285.         if (sourceTable == null) {
  286.             try {

  287.                 // perform request
  288.                 final HttpURLConnection connection = connect("");

  289.                 final int responseCode = connection.getResponseCode();
  290.                 if (responseCode == HttpURLConnection.HTTP_UNAUTHORIZED) {
  291.                     throw new OrekitException(OrekitMessages.FAILED_AUTHENTICATION, "caster");
  292.                 } else if (responseCode != HttpURLConnection.HTTP_OK) {
  293.                     throw new OrekitException(OrekitMessages.CONNECTION_ERROR, host, connection.getResponseMessage());
  294.                 }

  295.                 // for this request, we MUST get a source table
  296.                 if (!SOURCETABLE_CONTENT_TYPE.equals(connection.getContentType())) {
  297.                     throw new OrekitException(OrekitMessages.UNEXPECTED_CONTENT_TYPE, connection.getContentType());
  298.                 }

  299.                 final SourceTable table = new SourceTable(getHeaderValue(connection, FLAGS_HEADER_KEY));

  300.                 // parse source table records
  301.                 try (InputStream is = connection.getInputStream();
  302.                      InputStreamReader isr = new InputStreamReader(is, StandardCharsets.UTF_8);
  303.                      BufferedReader br = new BufferedReader(isr)) {
  304.                     int lineNumber = 0;
  305.                     for (String line = br.readLine(); line != null; line = br.readLine()) {

  306.                         ++lineNumber;
  307.                         line = line.trim();
  308.                         if (line.length() == 0) {
  309.                             continue;
  310.                         }

  311.                         if (line.startsWith(RecordType.CAS.toString())) {
  312.                             table.addCasterRecord(new CasterRecord(line));
  313.                         } else if (line.startsWith(RecordType.NET.toString())) {
  314.                             table.addNetworkRecord(new NetworkRecord(line));
  315.                         } else if (line.startsWith(RecordType.STR.toString())) {
  316.                             table.addDataStreamRecord(new DataStreamRecord(line));
  317.                         } else if (line.startsWith("ENDSOURCETABLE")) {
  318.                             // we have reached end of table
  319.                             break;
  320.                         } else {
  321.                             throw new OrekitException(OrekitMessages.SOURCETABLE_PARSE_ERROR,
  322.                                                       connection.getURL().getHost(), lineNumber, line);
  323.                         }

  324.                     }
  325.                 }

  326.                 sourceTable = table;
  327.                 return table;

  328.             } catch (IOException | URISyntaxException e) {
  329.                 throw new OrekitException(e, OrekitMessages.CANNOT_PARSE_SOURCETABLE, host);
  330.             }
  331.         }

  332.         return sourceTable;

  333.     }

  334.     /** Connect to a mount point and start streaming data from it.
  335.      * <p>
  336.      * This method sets up an internal dedicated thread for continuously
  337.      * monitoring data incoming from a mount point. When new complete
  338.      * {@link ParsedMessage parsed messages} becomes available, the
  339.      * {@link MessageObserver observers} that have been registered
  340.      * using {@link #addObserver(int, String, MessageObserver) addObserver()}
  341.      * method will be notified about the message.
  342.      * </p>
  343.      * <p>
  344.      * This method must be called once for each stream to monitor.
  345.      * </p>
  346.      * @param mountPoint mount point providing the stream
  347.      * @param type messages type of the mount point
  348.      * @param requiresNMEA if true, the mount point requires a NMEA GGA sentence in the request
  349.      * @param ignoreUnknownMessageTypes if true, unknown messages types are silently ignored
  350.      */
  351.     public void startStreaming(final String mountPoint, final org.orekit.gnss.metric.ntrip.Type type,
  352.                                final boolean requiresNMEA, final boolean ignoreUnknownMessageTypes) {

  353.         if (executorService == null) {
  354.             // lazy creation of executor service, with one thread for each possible data stream
  355.             executorService = Executors.newFixedThreadPool(getSourceTable().getDataStreams().size());
  356.         }

  357.         // safety check
  358.         if (monitors.containsKey(mountPoint)) {
  359.             throw new OrekitException(OrekitMessages.MOUNPOINT_ALREADY_CONNECTED, mountPoint);
  360.         }

  361.         // create the monitor
  362.         final StreamMonitor monitor = new StreamMonitor(this, mountPoint, type, requiresNMEA, ignoreUnknownMessageTypes,
  363.                                                         reconnectDelay, reconnectDelayFactor, maxRetries);
  364.         monitors.put(mountPoint, monitor);

  365.         // set up the already known observers
  366.         for (final ObserverHolder observerHolder : observers) {
  367.             if (observerHolder.mountPoint == null ||
  368.                 observerHolder.mountPoint.equals(mountPoint)) {
  369.                 monitor.addObserver(observerHolder.typeCode, observerHolder.observer);
  370.             }
  371.         }

  372.         // start streaming data
  373.         executorService.execute(monitor);

  374.     }

  375.     /** Check if any of the streaming thread has thrown an exception.
  376.      * <p>
  377.      * If a streaming thread has thrown an exception, it will be rethrown here
  378.      * </p>
  379.      */
  380.     public void checkException() {
  381.         // check if any of the stream got an exception
  382.         for (final  Map.Entry<String, StreamMonitor> entry : monitors.entrySet()) {
  383.             final OrekitException exception = entry.getValue().getException();
  384.             if (exception != null) {
  385.                 throw exception;
  386.             }
  387.         }
  388.     }

  389.     /** Stop streaming data from all connected mount points.
  390.      * <p>
  391.      * If an exception was encountered during data streaming, it will be rethrown here
  392.      * </p>
  393.      * @param time timeout for waiting underlying threads termination (ms)
  394.      */
  395.     public void stopStreaming(final int time) {

  396.         // ask all monitors to stop retrieving data
  397.         for (final  Map.Entry<String, StreamMonitor> entry : monitors.entrySet()) {
  398.             entry.getValue().stopMonitoring();
  399.         }

  400.         try {
  401.             // wait for proper ending
  402.             executorService.shutdown();
  403.             executorService.awaitTermination(time, TimeUnit.MILLISECONDS);
  404.         } catch (InterruptedException ie) {
  405.             // Restore interrupted state...
  406.             Thread.currentThread().interrupt();
  407.         }

  408.         checkException();

  409.     }

  410.     /** Connect to caster.
  411.      * @param mountPoint mount point (empty for getting sourcetable)
  412.      * @return performed connection
  413.      * @throws IOException if an I/O exception occurs during connection
  414.      * @throws URISyntaxException if the built URI is invalid
  415.      */
  416.     HttpURLConnection connect(final String mountPoint)
  417.         throws IOException, URISyntaxException {

  418.         // set up connection
  419.         final String scheme = "http";
  420.         final URL casterURL = new URI(scheme, null, host, port, "/" + mountPoint, null, null).toURL();
  421.         final HttpURLConnection connection = (HttpURLConnection) casterURL.openConnection(proxy);
  422.         connection.setConnectTimeout(timeout);
  423.         connection.setReadTimeout(timeout);

  424.         // common headers
  425.         connection.setRequestProperty(HOST_HEADER_KEY,       host);
  426.         connection.setRequestProperty(VERSION_HEADER_KEY,    VERSION_HEADER_VALUE);
  427.         connection.setRequestProperty(USER_AGENT_HEADER_KEY, USER_AGENT_HEADER_VALUE);
  428.         connection.setRequestProperty(CONNECTION_HEADER_KEY, CONNECTION_HEADER_VALUE);

  429.         return connection;

  430.     }

  431.     /** Get an header from a response.
  432.      * @param connection connection to analyze
  433.      * @param key header key
  434.      * @return header value
  435.      */
  436.     private String getHeaderValue(final URLConnection connection, final String key) {
  437.         final String value = connection.getHeaderField(key);
  438.         if (value == null) {
  439.             throw new OrekitException(OrekitMessages.MISSING_HEADER,
  440.                                       connection.getURL().getHost(), key);
  441.         }
  442.         return value;
  443.     }

  444.     /** Local holder for observers. */
  445.     private static class ObserverHolder {

  446.         /** Code for the message type. */
  447.         private final int typeCode;

  448.         /** Mount point. */
  449.         private final String mountPoint;

  450.         /** Observer to notify. */
  451.         private final MessageObserver observer;

  452.         /** Simple constructor.
  453.          * @param typeCode code for the message type
  454.          * @param mountPoint mountPoint from which data must come (if null, notification
  455.          * will be triggered regardless of mount point)
  456.          * @param observer observer for this message type
  457.          */
  458.         ObserverHolder(final int typeCode, final String mountPoint,
  459.                             final MessageObserver observer) {
  460.             this.typeCode   = typeCode;
  461.             this.mountPoint = mountPoint;
  462.             this.observer   = observer;
  463.         }

  464.     }

  465. }