package org.orekit.gnss.metric.ntrip;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.Authenticator;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.net.Proxy.Type;
import java.net.SocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLConnection;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Formatter;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.hipparchus.util.FastMath;
import org.orekit.errors.OrekitException;
import org.orekit.errors.OrekitMessages;
import org.orekit.gnss.metric.messages.ParsedMessage;

/** Source table for ntrip streams retrieval.
 * <p>
 * Note that all authentication is performed automatically by just
 * calling the standard {@link Authenticator#setDefault(Authenticator)}
 * method to set up an authenticator.
 * </p>
 * @author Luc Maisonobe
 * @since 11.0
public class NtripClient {

    /** Default timeout for connections and reads (ms). */
    public static final int DEFAULT_TIMEOUT = 10000;

    /** Default port for ntrip communication. */
    public static final int DEFAULT_PORT = 2101;

    /** Default delay before we reconnect after connection close (s). */
    public static final double DEFAULT_RECONNECT_DELAY = 1.0;

    /** Default factor by which reconnection delay is multiplied after each attempt. */
    public static final double DEFAULT_RECONNECT_DELAY_FACTOR = 1.5;

    /** Default maximum number of reconnect a attempts without readin any data. */
    public static final int DEFAULT_MAX_RECONNECT = 20;

    /** Host header. */
    private static final String HOST_HEADER_KEY = "Host";

    /** User-agent header key. */
    private static final String USER_AGENT_HEADER_KEY = "User-Agent";

    /** User-agent header value. */
    private static final String USER_AGENT_HEADER_VALUE = "NTRIP orekit/11.0";

    /** Version header key. */
    private static final String VERSION_HEADER_KEY = "Ntrip-Version";

    /** Version header value. */
    private static final String VERSION_HEADER_VALUE = "Ntrip/2.0";

    /** Connection header key. */
    private static final String CONNECTION_HEADER_KEY = "Connection";

    /** Connection header value. */
    private static final String CONNECTION_HEADER_VALUE = "close";

    /** Flags header key. */
    private static final String FLAGS_HEADER_KEY = "Ntrip-Flags";

    /** Content type for source table. */
    private static final String SOURCETABLE_CONTENT_TYPE = "gnss/sourcetable";

    /** Degrees to arc minutes conversion factor. */
    private static final double DEG_TO_MINUTES = 60.0;

    /** Caster host. */
    private final String host;

    /** Caster port. */
    private final int port;

    /** Delay before we reconnect after connection close. */
    private double reconnectDelay;

    /** Multiplication factor for reconnection delay. */
    private double reconnectDelayFactor;

    /** Max number of reconnections. */
    private int maxRetries;

    /** Timeout for connections and reads. */
    private int timeout;

    /** Proxy to use. */
    private Proxy proxy;

    /** NMEA GGA sentence (may be null). */
    private AtomicReference<String> gga;

    /** Observers for encoded messages. */
    private final List<ObserverHolder> observers;

    /** Monitors for data streams. */
    private final Map<String, StreamMonitor> monitors;

    /** Source table. */
    private SourceTable sourceTable;

    /** Executor for stream monitoring tasks. */
    private ExecutorService executorService;

    /** Build a client for NTRIP.
     * <p>
     * The default configuration uses default timeout, default reconnection
     * parameters, no GPS fix and no proxy.
     * </p>
     * @param host caster host providing the source table
     * @param port port to use for connection
     * see {@link #DEFAULT_PORT}
    public NtripClient(final String host, final int port) {
        this.host         = host;
        this.port         = port;
        this.observers    = new ArrayList<>();
        this.monitors     = new HashMap<>();
        setProxy(Type.DIRECT, null, -1);
        this.gga             = new AtomicReference<String>(null);
        this.sourceTable     = null;
        this.executorService = null;

    /** Get the caster host.
     * @return caster host
    public String getHost() {
        return host;

    /** Get the port to use for connection.
     * @return port to use for connection
    public int getPort() {
        return port;

    /** Set timeout for connections and reads.
     * @param timeout timeout for connections and reads (ms)
    public void setTimeout(final int timeout) {
        this.timeout = timeout;

    /** Set Reconnect parameters.
     * @param delay delay before we reconnect after connection close
     * @param delayFactor factor by which reconnection delay is multiplied after each attempt
     * @param max max number of reconnect a attempts without reading any data
    public void setReconnectParameters(final double delay,
                                       final double delayFactor,
                                       final int max) {
        this.reconnectDelay       = delay;
        this.reconnectDelayFactor = delayFactor;
        this.maxRetries           = max;

    /** Set proxy parameters.
     * @param type proxy type
     * @param proxyHost host name of the proxy (ignored if {@code type} is {@code Proxy.Type.DIRECT})
     * @param proxyPort port number of the proxy (ignored if {@code type} is {@code Proxy.Type.DIRECT})
    public void setProxy(final Proxy.Type type, final String proxyHost, final int proxyPort) {
        try {
            if (type == Proxy.Type.DIRECT) {
                // disable proxy
                proxy = Proxy.NO_PROXY;
            } else {
                // enable proxy
                final InetAddress   hostAddress  = InetAddress.getByName(proxyHost);
                final SocketAddress proxyAddress = new InetSocketAddress(hostAddress, proxyPort);
                proxy = new Proxy(type, proxyAddress);
        } catch (UnknownHostException uhe) {
            throw new OrekitException(uhe, OrekitMessages.UNKNOWN_HOST, proxyHost);

    /** Get proxy.
     * @return proxy to use
    public Proxy getProxy() {
        return proxy;

    /** Set GPS fix data to send as NMEA sentence to Ntrip caster if required.
     * @param hour hour of the fix (UTC time)
     * @param minute minute of the fix (UTC time)
     * @param second second of the fix (UTC time)
     * @param latitude latitude (radians)
     * @param longitude longitude (radians)
     * @param ellAltitude altitude above ellipsoid (m)
     * @param undulation height of the geoid above ellipsoid (m)
    public void setFix(final int hour, final int minute, final double second,
                       final double latitude, final double longitude, final double ellAltitude,
                       final double undulation) {

        // convert latitude
        final double latDeg = FastMath.abs(FastMath.toDegrees(latitude));
        final int    dLat   = (int) FastMath.floor(latDeg);
        final double mLat   = DEG_TO_MINUTES * (latDeg - dLat);
        final char   cLat   = latitude >= 0.0 ? 'N' : 'S';

        // convert longitude
        final double lonDeg = FastMath.abs(FastMath.toDegrees(longitude));
        final int    dLon   = (int) FastMath.floor(lonDeg);
        final double mLon   = DEG_TO_MINUTES * (lonDeg - dLon);
        final char   cLon   = longitude >= 0.0 ? 'E' : 'W';

        // build NMEA GGA sentence
        final StringBuilder builder = new StringBuilder(82);
        try (Formatter formatter = new Formatter(builder, Locale.US)) {

            // dummy values
            final int    fixQuality = 1;
            final int    nbSat      = 4;
            final double hdop       = 1.0;

            // sentence body
                             hour, minute, second,
                             dLat, mLat, cLat, dLon, mLon, cLon,
                             fixQuality, nbSat, hdop,
                             ellAltitude, undulation);

            // checksum
            byte sum = 0;
            for (int i = 1; i < builder.length(); ++i) {
                sum ^= builder.charAt(i);
            formatter.format("*%02X", sum);



    /** Get NMEA GGA sentence.
     * @return NMEA GGA sentence (may be null)
    String getGGA() {
        return gga.get();

    /** Add an observer for an encoded messages.
     * <p>
     * If messages of the specified type have already been retrieved from
     * a stream, the observer will be immediately notified with the last
     * message from each mount point (in unspecified order) as a side effect
     * of being added.
     * </p>
     * @param typeCode code for the message type (if set to 0, notification
     * will be triggered regardless of message type)
     * @param mountPoint mountPoint from which data must come (if null, notification
     * will be triggered regardless of mount point)
     * @param observer observer for this message type
    public void addObserver(final int typeCode, final String mountPoint,
                            final MessageObserver observer) {

        // store the observer for future monitored mount points
        observers.add(new ObserverHolder(typeCode, mountPoint, observer));

        // check if we should also add it to already monitored mount points
        for (Map.Entry<String, StreamMonitor> entry : monitors.entrySet()) {
            if (mountPoint == null || mountPoint.equals(entry.getKey())) {
                entry.getValue().addObserver(typeCode, observer);


    /** Get a sourcetable.
     * @return source table from the caster
    public SourceTable getSourceTable() {
        if (sourceTable == null) {
            try {

                // perform request
                final HttpURLConnection connection = connect("");

                final int responseCode = connection.getResponseCode();
                if (responseCode == HttpURLConnection.HTTP_UNAUTHORIZED) {
                    throw new OrekitException(OrekitMessages.FAILED_AUTHENTICATION, "caster");
                } else if (responseCode != HttpURLConnection.HTTP_OK) {
                    throw new OrekitException(OrekitMessages.CONNECTION_ERROR, host, connection.getResponseMessage());

                // for this request, we MUST get a source table
                if (!SOURCETABLE_CONTENT_TYPE.equals(connection.getContentType())) {
                    throw new OrekitException(OrekitMessages.UNEXPECTED_CONTENT_TYPE, connection.getContentType());

                final SourceTable table = new SourceTable(getHeaderValue(connection, FLAGS_HEADER_KEY));

                // parse source table records
                try (InputStream is = connection.getInputStream();
                     InputStreamReader isr = new InputStreamReader(is, StandardCharsets.UTF_8);
                     BufferedReader br = new BufferedReader(isr)) {
                    int lineNumber = 0;
                    for (String line = br.readLine(); line != null; line = br.readLine()) {

                        line = line.trim();
                        if (line.length() == 0) {

                        if (line.startsWith(RecordType.CAS.toString())) {
                            table.addCasterRecord(new CasterRecord(line));
                        } else if (line.startsWith(RecordType.NET.toString())) {
                            table.addNetworkRecord(new NetworkRecord(line));
                        } else if (line.startsWith(RecordType.STR.toString())) {
                            table.addDataStreamRecord(new DataStreamRecord(line));
                        } else if (line.startsWith("ENDSOURCETABLE")) {
                            // we have reached end of table
                        } else {
                            throw new OrekitException(OrekitMessages.SOURCETABLE_PARSE_ERROR,
                                                      connection.getURL().getHost(), lineNumber, line);


                sourceTable = table;
                return table;

            } catch (IOException | URISyntaxException e) {
                throw new OrekitException(e, OrekitMessages.CANNOT_PARSE_SOURCETABLE, host);

        return sourceTable;


    /** Connect to a mount point and start streaming data from it.
     * <p>
     * This method sets up an internal dedicated thread for continuously
     * monitoring data incoming from a mount point. When new complete
     * {@link ParsedMessage parsed messages} becomes available, the
     * {@link MessageObserver observers} that have been registered
     * using {@link #addObserver(int, String, MessageObserver) addObserver()}
     * method will be notified about the message.
     * </p>
     * <p>
     * This method must be called once for each stream to monitor.
     * </p>
     * @param mountPoint mount point providing the stream
     * @param type messages type of the mount point
     * @param requiresNMEA if true, the mount point requires a NMEA GGA sentence in the request
     * @param ignoreUnknownMessageTypes if true, unknown messages types are silently ignored
    public void startStreaming(final String mountPoint, final org.orekit.gnss.metric.ntrip.Type type,
                               final boolean requiresNMEA, final boolean ignoreUnknownMessageTypes) {

        if (executorService == null) {
            // lazy creation of executor service, with one thread for each possible data stream
            executorService = Executors.newFixedThreadPool(getSourceTable().getDataStreams().size());

        // safety check
        if (monitors.containsKey(mountPoint)) {
            throw new OrekitException(OrekitMessages.MOUNPOINT_ALREADY_CONNECTED, mountPoint);

        // create the monitor
        final StreamMonitor monitor = new StreamMonitor(this, mountPoint, type, requiresNMEA, ignoreUnknownMessageTypes,
                                                        reconnectDelay, reconnectDelayFactor, maxRetries);
        monitors.put(mountPoint, monitor);

        // set up the already known observers
        for (final ObserverHolder observerHolder : observers) {
            if (observerHolder.mountPoint == null ||
                observerHolder.mountPoint.equals(mountPoint)) {
                monitor.addObserver(observerHolder.typeCode, observerHolder.observer);

        // start streaming data


    /** Check if any of the streaming thread has thrown an exception.
     * <p>
     * If a streaming thread has thrown an exception, it will be rethrown here
     * </p>
    public void checkException() {
        // check if any of the stream got an exception
        for (final  Map.Entry<String, StreamMonitor> entry : monitors.entrySet()) {
            final OrekitException exception = entry.getValue().getException();
            if (exception != null) {
                throw exception;

    /** Stop streaming data from all connected mount points.
     * <p>
     * If an exception was encountered during data streaming, it will be rethrown here
     * </p>
     * @param time timeout for waiting underlying threads termination (ms)
    public void stopStreaming(final int time) {

        // ask all monitors to stop retrieving data
        for (final  Map.Entry<String, StreamMonitor> entry : monitors.entrySet()) {

        try {
            // wait for proper ending
            executorService.awaitTermination(time, TimeUnit.MILLISECONDS);
        } catch (InterruptedException ie) {
            // Restore interrupted state...



    /** Connect to caster.
     * @param mountPoint mount point (empty for getting sourcetable)
     * @return performed connection
     * @throws IOException if an I/O exception occurs during connection
     * @throws URISyntaxException if the built URI is invalid
    HttpURLConnection connect(final String mountPoint)
        throws IOException, URISyntaxException {

        // set up connection
        final String scheme = "http";
        final URL casterURL = new URI(scheme, null, host, port, "/" + mountPoint, null, null).toURL();
        final HttpURLConnection connection = (HttpURLConnection) casterURL.openConnection(proxy);

        // common headers
        connection.setRequestProperty(HOST_HEADER_KEY,       host);
        connection.setRequestProperty(VERSION_HEADER_KEY,    VERSION_HEADER_VALUE);
        connection.setRequestProperty(USER_AGENT_HEADER_KEY, USER_AGENT_HEADER_VALUE);
        connection.setRequestProperty(CONNECTION_HEADER_KEY, CONNECTION_HEADER_VALUE);

        return connection;


    /** Get an header from a response.
     * @param connection connection to analyze
     * @param key header key
     * @return header value
    private String getHeaderValue(final URLConnection connection, final String key) {
        final String value = connection.getHeaderField(key);
        if (value == null) {
            throw new OrekitException(OrekitMessages.MISSING_HEADER,
                                      connection.getURL().getHost(), key);
        return value;

    /** Local holder for observers. */
    private static class ObserverHolder {

        /** Code for the message type. */
        private final int typeCode;

        /** Mount point. */
        private final String mountPoint;

        /** Observer to notify. */
        private final MessageObserver observer;

        /** Simple constructor.
         * @param typeCode code for the message type
         * @param mountPoint mountPoint from which data must come (if null, notification
         * will be triggered regardless of mount point)
         * @param observer observer for this message type
        ObserverHolder(final int typeCode, final String mountPoint,
                            final MessageObserver observer) {
            this.typeCode   = typeCode;
            this.mountPoint = mountPoint;
            this.observer   = observer;

