1   /* Copyright 2002-2021 CS GROUP
2    * Licensed to CS GROUP (CS) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * CS licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *   http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.orekit.gnss.metric.ntrip;
18  
19  import java.io.IOException;
20  import java.io.InputStream;
21  import java.net.HttpURLConnection;
22  import java.net.SocketTimeoutException;
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.concurrent.atomic.AtomicBoolean;
29  import java.util.concurrent.atomic.AtomicReference;
30  
31  import org.orekit.errors.OrekitException;
32  import org.orekit.errors.OrekitInternalError;
33  import org.orekit.errors.OrekitMessages;
34  import org.orekit.gnss.metric.messages.ParsedMessage;
35  import org.orekit.gnss.metric.parser.AbstractEncodedMessages;
36  import org.orekit.gnss.metric.parser.MessagesParser;
37  
38  /** Monitor for retrieving streamed data from one mount point.
39   * @author Luc Maisonobe
40   * @since 11.0
41   */
42  public class StreamMonitor extends AbstractEncodedMessages implements Runnable {
43  
44      /** GGA header key. */
45      private static final String GGA_HEADER_KEY = "Ntrip-GGA";
46  
47      /** Content type for GNSS data. */
48      private static final String GNSS_DATA_CONTENT_TYPE = "gnss/data";
49  
50      /** Size of buffer for retrieving data. */
51      private static final int BUFFER_SIZE = 0x4000;
52  
53      /** Frame preamble. */
54      private static final int PREAMBLE = 0xD3;
55  
56      /** Frame preamble size. */
57      private static final int PREAMBLE_SIZE = 3;
58  
59      /** Frame CRC size. */
60      private static final int CRC_SIZE = 3;
61  
62      /** Generator polynomial for CRC. */
63      private static final int GENERATOR = 0x1864CFB;
64  
65      /** High bit of the generator polynomial. */
66      private static final int HIGH = 0x1000000;
67  
68      /** CRC 24Q lookup table. */
69      private static final int[] CRC_LOOKUP = new int[256];
70  
71      static {
72  
73          // set up lookup table
74          CRC_LOOKUP[0] = 0;
75          CRC_LOOKUP[1] = GENERATOR;
76  
77          int h = GENERATOR;
78          for (int i = 2; i < 256; i <<= 1) {
79              h <<= 1;
80              if ((h & HIGH) != 0) {
81                  h ^= GENERATOR;
82              }
83              for (int j = 0; j < i; ++j) {
84                  CRC_LOOKUP[i + j] = CRC_LOOKUP[j] ^ h;
85              }
86          }
87  
88      }
89  
90      /** Associated NTRIP client. */
91      private final NtripClient client;
92  
93      /** Mount point providing the stream. */
94      private final String mountPoint;
95  
96      /** Messages type of the mount point. */
97      private final Type type;
98  
99      /** Indicator for required NMEA. */
100     private final boolean nmeaRequired;
101 
102     /** Indicator for ignoring unknown messages. */
103     private final boolean ignoreUnknownMessageTypes;
104 
105     /** Delay before we reconnect after connection close. */
106     private final double reconnectDelay;
107 
108     /** Multiplication factor for reconnection delay. */
109     private final double reconnectDelayFactor;
110 
111     /** Max number of reconnections. */
112     private final int maxRetries;
113 
114     /** Stop flag. */
115     private AtomicBoolean stop;
116 
117     /** Circular buffer. */
118     private byte[] buffer;
119 
120     /** Read index. */
121     private int readIndex;
122 
123     /** Message end index. */
124     private int messageEndIndex;
125 
126     /** Write index. */
127     private int writeIndex;
128 
129     /** Observers for encoded messages. */
130     private final Map<Integer, List<MessageObserver>> observers;
131 
132     /** Last available message for each type. */
133     private final Map<Integer, ParsedMessage> lastMessages;
134 
135     /** Exception caught during monitoring. */
136     private final AtomicReference<OrekitException> exception;
137 
138     /** Build a monitor for streaming data from a mount point.
139      * @param client associated NTRIP client
140      * @param mountPoint mount point providing the stream
141      * @param type messages type of the mount point
142      * @param requiresNMEA if true, the mount point requires a NMEA GGA sentence in the request
143      * @param ignoreUnknownMessageTypes if true, unknown messages types are silently ignored
144      * @param reconnectDelay delay before we reconnect after connection close
145      * @param reconnectDelayFactor factor by which reconnection delay is multiplied after each attempt
146      * @param maxRetries max number of reconnect a attempts without reading any data
147      */
148     public StreamMonitor(final NtripClient client,
149                          final String mountPoint, final Type type,
150                          final boolean requiresNMEA, final boolean ignoreUnknownMessageTypes,
151                          final double reconnectDelay, final double reconnectDelayFactor,
152                          final int maxRetries) {
153         this.client                    = client;
154         this.mountPoint                = mountPoint;
155         this.type                      = type;
156         this.nmeaRequired              = requiresNMEA;
157         this.ignoreUnknownMessageTypes = ignoreUnknownMessageTypes;
158         this.reconnectDelay            = reconnectDelay;
159         this.reconnectDelayFactor      = reconnectDelayFactor;
160         this.maxRetries                = maxRetries;
161         this.stop                      = new AtomicBoolean(false);
162         this.observers                 = new HashMap<>();
163         this.lastMessages              = new HashMap<>();
164         this.exception                 = new AtomicReference<OrekitException>(null);
165     }
166 
167     /** Add an observer for encoded messages.
168      * <p>
169      * If messages of the specified type have already been retrieved from
170      * a stream, the observer will be immediately notified with the last
171      * message as a side effect of being added.
172      * </p>
173      * @param typeCode code for the message type (if set to 0, notification
174      * will be triggered regardless of message type)
175      * @param observer observer for this message type
176      */
177     public void addObserver(final int typeCode, final MessageObserver observer) {
178         synchronized (observers) {
179 
180             // register the observer
181             List<MessageObserver> list = observers.get(typeCode);
182             if (list == null) {
183                 // create a new list the first time we register an observer for a message
184                 list =  new ArrayList<>();
185                 observers.put(typeCode, list);
186             }
187             list.add(observer);
188 
189             // if we already have a message of the proper type
190             // immediately notify the new observer about it
191             final ParsedMessage last = lastMessages.get(typeCode);
192             if (last != null) {
193                 observer.messageAvailable(mountPoint, last);
194             }
195 
196         }
197     }
198 
199     /** Stop monitoring. */
200     public void stopMonitoring() {
201         stop.set(true);
202     }
203 
204     /** Retrieve exception caught during monitoring.
205      * @return exception caught
206      */
207     public OrekitException getException() {
208         return exception.get();
209     }
210 
211     /** {@inheritDoc} */
212     @Override
213     public void run() {
214 
215         try {
216 
217             final MessagesParser parser = type.getParser(extractUsedMessages());
218             int nbAttempts = 0;
219             double delay = reconnectDelay;
220             while (nbAttempts < maxRetries) {
221 
222                 try {
223                     // prepare request
224                     final HttpURLConnection connection = client.connect(mountPoint);
225                     if (nmeaRequired) {
226                         if (client.getGGA() == null) {
227                             throw new OrekitException(OrekitMessages.STREAM_REQUIRES_NMEA_FIX, mountPoint);
228                         } else {
229                             // update NMEA GGA sentence in the extra headers for this mount point
230                             connection.setRequestProperty(GGA_HEADER_KEY, client.getGGA());
231                         }
232                     }
233 
234                     // perform request
235                     final int responseCode = connection.getResponseCode();
236                     if (responseCode == HttpURLConnection.HTTP_UNAUTHORIZED) {
237                         throw new OrekitException(OrekitMessages.FAILED_AUTHENTICATION, mountPoint);
238                     } else if (responseCode != HttpURLConnection.HTTP_OK) {
239                         throw new OrekitException(OrekitMessages.CONNECTION_ERROR,
240                                                   connection.getURL().getHost(),
241                                                   connection.getResponseMessage());
242                     }
243 
244                     // for this request, we MUST get GNSS data
245                     if (!GNSS_DATA_CONTENT_TYPE.equals(connection.getContentType())) {
246                         throw new OrekitException(OrekitMessages.UNEXPECTED_CONTENT_TYPE, connection.getContentType());
247                     }
248 
249                     // data extraction loop
250                     resetCircularBuffer();
251                     try (InputStream is = connection.getInputStream()) {
252 
253                         for (int r = fillUp(is); r >= 0; r = fillUp(is)) {
254 
255                             // we have read something, reset reconnection attempts counters
256                             nbAttempts = 0;
257                             delay      = reconnectDelay;
258 
259                             if (stop.get()) {
260                                 // stop monitoring immediately
261                                 // (returning closes the input stream automatically)
262                                 return;
263                             }
264 
265                             while (bufferSize() >= 3) {
266                                 if (peekByte(0) != PREAMBLE) {
267                                     // we are out of synch with respect to frame structure
268                                     // drop the unknown byte
269                                     moveRead(1);
270                                 } else {
271                                     final int size = (peekByte(1) & 0x03) << 8 | peekByte(2);
272                                     if (bufferSize() >= PREAMBLE_SIZE + size + CRC_SIZE) {
273                                         // check CRC
274                                         final int crc = (peekByte(PREAMBLE_SIZE + size)     << 16) |
275                                                         (peekByte(PREAMBLE_SIZE + size + 1) <<  8) |
276                                                          peekByte(PREAMBLE_SIZE + size + 2);
277                                         if (crc == computeCRC(PREAMBLE_SIZE + size)) {
278                                             // we have a complete and consistent frame
279                                             // we can extract the message it contains
280                                             messageEndIndex = (readIndex + PREAMBLE_SIZE + size) % BUFFER_SIZE;
281                                             moveRead(PREAMBLE_SIZE);
282                                             start();
283                                             final ParsedMessage message = parser.parse(this, ignoreUnknownMessageTypes);
284                                             if (message != null) {
285                                                 storeAndNotify(message);
286                                             }
287                                             // jump to expected message end, in case the message was corrupted
288                                             // and parsing did not reach message end
289                                             readIndex = (messageEndIndex + CRC_SIZE) % BUFFER_SIZE;
290                                         } else {
291                                             // CRC is not consistent, we are probably not really synched
292                                             // and the preamble byte was just a random byte
293                                             // we drop this single byte and continue looking for sync
294                                             moveRead(1);
295                                         }
296                                     } else {
297                                         // the frame is not complete, we need more data
298                                         break;
299                                     }
300                                 }
301                             }
302 
303                         }
304 
305                     }
306                 } catch (SocketTimeoutException ste) {
307                     // ignore exception, it will be handled by reconnection attempt below
308                 } catch (IOException ioe) {
309                     throw new OrekitException(ioe, OrekitMessages.CANNOT_PARSE_GNSS_DATA, client.getHost());
310                 }
311 
312                 // manage reconnection
313                 try {
314                     Thread.sleep((int) Math.rint(delay * 1000));
315                 } catch (InterruptedException ie) {
316                     // Restore interrupted state...
317                     Thread.currentThread().interrupt();
318                 }
319                 ++nbAttempts;
320                 delay *= reconnectDelayFactor;
321 
322             }
323 
324         } catch (OrekitException oe) {
325             // store the exception so it can be retrieved by Ntrip client
326             exception.set(oe);
327         }
328 
329     }
330 
331     /** Store a parsed encoded message and notify observers.
332      * @param message parsed message
333      */
334     private void storeAndNotify(final ParsedMessage message) {
335         synchronized (observers) {
336 
337             for (int typeCode : Arrays.asList(0, message.getTypeCode())) {
338 
339                 // store message
340                 lastMessages.put(typeCode, message);
341 
342                 // notify observers
343                 final List<MessageObserver> list = observers.get(typeCode);
344                 if (list != null) {
345                     for (final MessageObserver observer : list) {
346                         // notify observer
347                         observer.messageAvailable(mountPoint, message);
348                     }
349                 }
350 
351             }
352 
353         }
354     }
355 
356     /** Reset the circular buffer.
357      */
358     private void resetCircularBuffer() {
359         buffer     = new byte[BUFFER_SIZE];
360         readIndex  = 0;
361         writeIndex = 0;
362     }
363 
364     /** Extract data from input stream.
365      * @param is input stream to extract data from
366      * @return number of byes read or -1
367      * @throws IOException if data cannot be extracted properly
368      */
369     private int fillUp(final InputStream is) throws IOException {
370         final int max = bufferMaxWrite();
371         if (max == 0) {
372             // this should never happen
373             // the buffer is large enough for almost 16 encoded messages, including wrapping frame
374             throw new OrekitInternalError(null);
375         }
376         final int r = is.read(buffer, writeIndex, max);
377         if (r >= 0) {
378             writeIndex = (writeIndex + r) % BUFFER_SIZE;
379         }
380         return r;
381     }
382 
383     /** {@inheritDoc} */
384     @Override
385     protected int fetchByte() {
386         if (readIndex == messageEndIndex || readIndex == writeIndex) {
387             return -1;
388         }
389 
390         final int ret = buffer[readIndex] & 0xFF;
391         moveRead(1);
392         return ret;
393     }
394 
395     /** Get the number of bytes currently in the buffer.
396      * @return number of bytes currently in the buffer
397      */
398     private int bufferSize() {
399         final int n = writeIndex - readIndex;
400         return n >= 0 ? n : BUFFER_SIZE + n;
401     }
402 
403     /** Peek a buffer byte without moving read pointer.
404      * @param offset offset counted from read pointer
405      * @return value of the byte at given offset
406      */
407     private int peekByte(final int offset) {
408         return buffer[(readIndex + offset) % BUFFER_SIZE] & 0xFF;
409     }
410 
411     /** Move read pointer.
412      * @param n number of bytes to move read pointer
413      */
414     private void moveRead(final int n) {
415         readIndex = (readIndex + n) % BUFFER_SIZE;
416     }
417 
418     /** Get the number of bytes that can be added to the buffer without wrapping around.
419      * @return number of bytes that can be added
420      */
421     private int bufferMaxWrite() {
422         if (writeIndex >= readIndex) {
423             return (readIndex == 0 ? BUFFER_SIZE - 1 : BUFFER_SIZE) - writeIndex;
424         } else {
425             return readIndex - writeIndex - 1;
426         }
427     }
428 
429     /** Compute QualCom CRC.
430      * @param length length of the byte stream
431      * @return QualCom CRC
432      */
433     private int computeCRC(final int length) {
434         int crc = 0;
435         for (int i = 0; i < length; ++i) {
436             crc = ((crc << 8) ^ CRC_LOOKUP[peekByte(i) ^ (crc >>> 16)]) & (HIGH - 1);
437         }
438         return crc;
439     }
440 
441     private List<Integer> extractUsedMessages() {
442         synchronized (observers) {
443 
444             // List of needed messages
445             final List<Integer> messages = new ArrayList<>();
446 
447             // Loop on observers entries
448             for (Map.Entry<Integer, List<MessageObserver>> entry : observers.entrySet()) {
449                 // Extract message type code
450                 final int typeCode = entry.getKey();
451                 // Add to the list
452                 messages.add(typeCode);
453             }
454 
455             return messages;
456         }
457     }
458 
459 }