1 /* Copyright 2002-2021 CS GROUP
2 * Licensed to CS GROUP (CS) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * CS licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.orekit.gnss.metric.ntrip;
18
19 import java.io.IOException;
20 import java.io.InputStream;
21 import java.net.HttpURLConnection;
22 import java.net.SocketTimeoutException;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.concurrent.atomic.AtomicBoolean;
29 import java.util.concurrent.atomic.AtomicReference;
30
31 import org.orekit.errors.OrekitException;
32 import org.orekit.errors.OrekitInternalError;
33 import org.orekit.errors.OrekitMessages;
34 import org.orekit.gnss.metric.messages.ParsedMessage;
35 import org.orekit.gnss.metric.parser.AbstractEncodedMessages;
36 import org.orekit.gnss.metric.parser.MessagesParser;
37
38 /** Monitor for retrieving streamed data from one mount point.
39 * @author Luc Maisonobe
40 * @since 11.0
41 */
42 public class StreamMonitor extends AbstractEncodedMessages implements Runnable {
43
44 /** GGA header key. */
45 private static final String GGA_HEADER_KEY = "Ntrip-GGA";
46
47 /** Content type for GNSS data. */
48 private static final String GNSS_DATA_CONTENT_TYPE = "gnss/data";
49
50 /** Size of buffer for retrieving data. */
51 private static final int BUFFER_SIZE = 0x4000;
52
53 /** Frame preamble. */
54 private static final int PREAMBLE = 0xD3;
55
56 /** Frame preamble size. */
57 private static final int PREAMBLE_SIZE = 3;
58
59 /** Frame CRC size. */
60 private static final int CRC_SIZE = 3;
61
62 /** Generator polynomial for CRC. */
63 private static final int GENERATOR = 0x1864CFB;
64
65 /** High bit of the generator polynomial. */
66 private static final int HIGH = 0x1000000;
67
68 /** CRC 24Q lookup table. */
69 private static final int[] CRC_LOOKUP = new int[256];
70
71 static {
72
73 // set up lookup table
74 CRC_LOOKUP[0] = 0;
75 CRC_LOOKUP[1] = GENERATOR;
76
77 int h = GENERATOR;
78 for (int i = 2; i < 256; i <<= 1) {
79 h <<= 1;
80 if ((h & HIGH) != 0) {
81 h ^= GENERATOR;
82 }
83 for (int j = 0; j < i; ++j) {
84 CRC_LOOKUP[i + j] = CRC_LOOKUP[j] ^ h;
85 }
86 }
87
88 }
89
90 /** Associated NTRIP client. */
91 private final NtripClient client;
92
93 /** Mount point providing the stream. */
94 private final String mountPoint;
95
96 /** Messages type of the mount point. */
97 private final Type type;
98
99 /** Indicator for required NMEA. */
100 private final boolean nmeaRequired;
101
102 /** Indicator for ignoring unknown messages. */
103 private final boolean ignoreUnknownMessageTypes;
104
105 /** Delay before we reconnect after connection close. */
106 private final double reconnectDelay;
107
108 /** Multiplication factor for reconnection delay. */
109 private final double reconnectDelayFactor;
110
111 /** Max number of reconnections. */
112 private final int maxRetries;
113
114 /** Stop flag. */
115 private AtomicBoolean stop;
116
117 /** Circular buffer. */
118 private byte[] buffer;
119
120 /** Read index. */
121 private int readIndex;
122
123 /** Message end index. */
124 private int messageEndIndex;
125
126 /** Write index. */
127 private int writeIndex;
128
129 /** Observers for encoded messages. */
130 private final Map<Integer, List<MessageObserver>> observers;
131
132 /** Last available message for each type. */
133 private final Map<Integer, ParsedMessage> lastMessages;
134
135 /** Exception caught during monitoring. */
136 private final AtomicReference<OrekitException> exception;
137
138 /** Build a monitor for streaming data from a mount point.
139 * @param client associated NTRIP client
140 * @param mountPoint mount point providing the stream
141 * @param type messages type of the mount point
142 * @param requiresNMEA if true, the mount point requires a NMEA GGA sentence in the request
143 * @param ignoreUnknownMessageTypes if true, unknown messages types are silently ignored
144 * @param reconnectDelay delay before we reconnect after connection close
145 * @param reconnectDelayFactor factor by which reconnection delay is multiplied after each attempt
146 * @param maxRetries max number of reconnect a attempts without reading any data
147 */
148 public StreamMonitor(final NtripClient client,
149 final String mountPoint, final Type type,
150 final boolean requiresNMEA, final boolean ignoreUnknownMessageTypes,
151 final double reconnectDelay, final double reconnectDelayFactor,
152 final int maxRetries) {
153 this.client = client;
154 this.mountPoint = mountPoint;
155 this.type = type;
156 this.nmeaRequired = requiresNMEA;
157 this.ignoreUnknownMessageTypes = ignoreUnknownMessageTypes;
158 this.reconnectDelay = reconnectDelay;
159 this.reconnectDelayFactor = reconnectDelayFactor;
160 this.maxRetries = maxRetries;
161 this.stop = new AtomicBoolean(false);
162 this.observers = new HashMap<>();
163 this.lastMessages = new HashMap<>();
164 this.exception = new AtomicReference<OrekitException>(null);
165 }
166
167 /** Add an observer for encoded messages.
168 * <p>
169 * If messages of the specified type have already been retrieved from
170 * a stream, the observer will be immediately notified with the last
171 * message as a side effect of being added.
172 * </p>
173 * @param typeCode code for the message type (if set to 0, notification
174 * will be triggered regardless of message type)
175 * @param observer observer for this message type
176 */
177 public void addObserver(final int typeCode, final MessageObserver observer) {
178 synchronized (observers) {
179
180 // register the observer
181 List<MessageObserver> list = observers.get(typeCode);
182 if (list == null) {
183 // create a new list the first time we register an observer for a message
184 list = new ArrayList<>();
185 observers.put(typeCode, list);
186 }
187 list.add(observer);
188
189 // if we already have a message of the proper type
190 // immediately notify the new observer about it
191 final ParsedMessage last = lastMessages.get(typeCode);
192 if (last != null) {
193 observer.messageAvailable(mountPoint, last);
194 }
195
196 }
197 }
198
199 /** Stop monitoring. */
200 public void stopMonitoring() {
201 stop.set(true);
202 }
203
204 /** Retrieve exception caught during monitoring.
205 * @return exception caught
206 */
207 public OrekitException getException() {
208 return exception.get();
209 }
210
211 /** {@inheritDoc} */
212 @Override
213 public void run() {
214
215 try {
216
217 final MessagesParser parser = type.getParser(extractUsedMessages());
218 int nbAttempts = 0;
219 double delay = reconnectDelay;
220 while (nbAttempts < maxRetries) {
221
222 try {
223 // prepare request
224 final HttpURLConnection connection = client.connect(mountPoint);
225 if (nmeaRequired) {
226 if (client.getGGA() == null) {
227 throw new OrekitException(OrekitMessages.STREAM_REQUIRES_NMEA_FIX, mountPoint);
228 } else {
229 // update NMEA GGA sentence in the extra headers for this mount point
230 connection.setRequestProperty(GGA_HEADER_KEY, client.getGGA());
231 }
232 }
233
234 // perform request
235 final int responseCode = connection.getResponseCode();
236 if (responseCode == HttpURLConnection.HTTP_UNAUTHORIZED) {
237 throw new OrekitException(OrekitMessages.FAILED_AUTHENTICATION, mountPoint);
238 } else if (responseCode != HttpURLConnection.HTTP_OK) {
239 throw new OrekitException(OrekitMessages.CONNECTION_ERROR,
240 connection.getURL().getHost(),
241 connection.getResponseMessage());
242 }
243
244 // for this request, we MUST get GNSS data
245 if (!GNSS_DATA_CONTENT_TYPE.equals(connection.getContentType())) {
246 throw new OrekitException(OrekitMessages.UNEXPECTED_CONTENT_TYPE, connection.getContentType());
247 }
248
249 // data extraction loop
250 resetCircularBuffer();
251 try (InputStream is = connection.getInputStream()) {
252
253 for (int r = fillUp(is); r >= 0; r = fillUp(is)) {
254
255 // we have read something, reset reconnection attempts counters
256 nbAttempts = 0;
257 delay = reconnectDelay;
258
259 if (stop.get()) {
260 // stop monitoring immediately
261 // (returning closes the input stream automatically)
262 return;
263 }
264
265 while (bufferSize() >= 3) {
266 if (peekByte(0) != PREAMBLE) {
267 // we are out of synch with respect to frame structure
268 // drop the unknown byte
269 moveRead(1);
270 } else {
271 final int size = (peekByte(1) & 0x03) << 8 | peekByte(2);
272 if (bufferSize() >= PREAMBLE_SIZE + size + CRC_SIZE) {
273 // check CRC
274 final int crc = (peekByte(PREAMBLE_SIZE + size) << 16) |
275 (peekByte(PREAMBLE_SIZE + size + 1) << 8) |
276 peekByte(PREAMBLE_SIZE + size + 2);
277 if (crc == computeCRC(PREAMBLE_SIZE + size)) {
278 // we have a complete and consistent frame
279 // we can extract the message it contains
280 messageEndIndex = (readIndex + PREAMBLE_SIZE + size) % BUFFER_SIZE;
281 moveRead(PREAMBLE_SIZE);
282 start();
283 final ParsedMessage message = parser.parse(this, ignoreUnknownMessageTypes);
284 if (message != null) {
285 storeAndNotify(message);
286 }
287 // jump to expected message end, in case the message was corrupted
288 // and parsing did not reach message end
289 readIndex = (messageEndIndex + CRC_SIZE) % BUFFER_SIZE;
290 } else {
291 // CRC is not consistent, we are probably not really synched
292 // and the preamble byte was just a random byte
293 // we drop this single byte and continue looking for sync
294 moveRead(1);
295 }
296 } else {
297 // the frame is not complete, we need more data
298 break;
299 }
300 }
301 }
302
303 }
304
305 }
306 } catch (SocketTimeoutException ste) {
307 // ignore exception, it will be handled by reconnection attempt below
308 } catch (IOException ioe) {
309 throw new OrekitException(ioe, OrekitMessages.CANNOT_PARSE_GNSS_DATA, client.getHost());
310 }
311
312 // manage reconnection
313 try {
314 Thread.sleep((int) Math.rint(delay * 1000));
315 } catch (InterruptedException ie) {
316 // Restore interrupted state...
317 Thread.currentThread().interrupt();
318 }
319 ++nbAttempts;
320 delay *= reconnectDelayFactor;
321
322 }
323
324 } catch (OrekitException oe) {
325 // store the exception so it can be retrieved by Ntrip client
326 exception.set(oe);
327 }
328
329 }
330
331 /** Store a parsed encoded message and notify observers.
332 * @param message parsed message
333 */
334 private void storeAndNotify(final ParsedMessage message) {
335 synchronized (observers) {
336
337 for (int typeCode : Arrays.asList(0, message.getTypeCode())) {
338
339 // store message
340 lastMessages.put(typeCode, message);
341
342 // notify observers
343 final List<MessageObserver> list = observers.get(typeCode);
344 if (list != null) {
345 for (final MessageObserver observer : list) {
346 // notify observer
347 observer.messageAvailable(mountPoint, message);
348 }
349 }
350
351 }
352
353 }
354 }
355
356 /** Reset the circular buffer.
357 */
358 private void resetCircularBuffer() {
359 buffer = new byte[BUFFER_SIZE];
360 readIndex = 0;
361 writeIndex = 0;
362 }
363
364 /** Extract data from input stream.
365 * @param is input stream to extract data from
366 * @return number of byes read or -1
367 * @throws IOException if data cannot be extracted properly
368 */
369 private int fillUp(final InputStream is) throws IOException {
370 final int max = bufferMaxWrite();
371 if (max == 0) {
372 // this should never happen
373 // the buffer is large enough for almost 16 encoded messages, including wrapping frame
374 throw new OrekitInternalError(null);
375 }
376 final int r = is.read(buffer, writeIndex, max);
377 if (r >= 0) {
378 writeIndex = (writeIndex + r) % BUFFER_SIZE;
379 }
380 return r;
381 }
382
383 /** {@inheritDoc} */
384 @Override
385 protected int fetchByte() {
386 if (readIndex == messageEndIndex || readIndex == writeIndex) {
387 return -1;
388 }
389
390 final int ret = buffer[readIndex] & 0xFF;
391 moveRead(1);
392 return ret;
393 }
394
395 /** Get the number of bytes currently in the buffer.
396 * @return number of bytes currently in the buffer
397 */
398 private int bufferSize() {
399 final int n = writeIndex - readIndex;
400 return n >= 0 ? n : BUFFER_SIZE + n;
401 }
402
403 /** Peek a buffer byte without moving read pointer.
404 * @param offset offset counted from read pointer
405 * @return value of the byte at given offset
406 */
407 private int peekByte(final int offset) {
408 return buffer[(readIndex + offset) % BUFFER_SIZE] & 0xFF;
409 }
410
411 /** Move read pointer.
412 * @param n number of bytes to move read pointer
413 */
414 private void moveRead(final int n) {
415 readIndex = (readIndex + n) % BUFFER_SIZE;
416 }
417
418 /** Get the number of bytes that can be added to the buffer without wrapping around.
419 * @return number of bytes that can be added
420 */
421 private int bufferMaxWrite() {
422 if (writeIndex >= readIndex) {
423 return (readIndex == 0 ? BUFFER_SIZE - 1 : BUFFER_SIZE) - writeIndex;
424 } else {
425 return readIndex - writeIndex - 1;
426 }
427 }
428
429 /** Compute QualCom CRC.
430 * @param length length of the byte stream
431 * @return QualCom CRC
432 */
433 private int computeCRC(final int length) {
434 int crc = 0;
435 for (int i = 0; i < length; ++i) {
436 crc = ((crc << 8) ^ CRC_LOOKUP[peekByte(i) ^ (crc >>> 16)]) & (HIGH - 1);
437 }
438 return crc;
439 }
440
441 private List<Integer> extractUsedMessages() {
442 synchronized (observers) {
443
444 // List of needed messages
445 final List<Integer> messages = new ArrayList<>();
446
447 // Loop on observers entries
448 for (Map.Entry<Integer, List<MessageObserver>> entry : observers.entrySet()) {
449 // Extract message type code
450 final int typeCode = entry.getKey();
451 // Add to the list
452 messages.add(typeCode);
453 }
454
455 return messages;
456 }
457 }
458
459 }