Nyx Node
Loading...
Searching...
No Matches
mongoose.c
1/* NyxNode
2 * Author: Jérôme ODIER <jerome.odier@lpsc.in2p3.fr>
3 * SPDX-License-Identifier: GPL-2.0-only (Mongoose backend) or GPL-3.0+
4 */
5
6/*--------------------------------------------------------------------------------------------------------------------*/
7#if !defined(ARDUINO)
8/*--------------------------------------------------------------------------------------------------------------------*/
9
10#include <stdarg.h>
11#include <stdlib.h>
12#include <string.h>
13
14#include "external/mongoose.h"
15
16#include "../nyx_node_internal.h"
17
18/*--------------------------------------------------------------------------------------------------------------------*/
19
20struct nyx_stack_s
21{
22 struct mg_mgr mgr;
23
24 struct mg_mqtt_opts mqtt_opts;
25
26 struct mg_connection *indi_connection;
27 struct mg_connection *mqtt_connection;
28 struct mg_connection *stream_connection;
29};
30
31/*--------------------------------------------------------------------------------------------------------------------*/
32/* LOGGER */
33/*--------------------------------------------------------------------------------------------------------------------*/
34
35void nyx_log(nyx_log_level_t level, STR_t file, STR_t func, int line, STR_t fmt, ...)
36{
37 if(level <= nyx_log_level)
38 {
39 /*------------------------------------------------------------------------------------------------------------*/
40
41 STR_t p;
42
43 p = strrchr(file, '/');
44
45 if(p != NULL)
46 {
47 file = p + 1;
48 }
49 else
50 {
51 p = strrchr(file, '\\');
52
53 if(p != NULL)
54 {
55 file = p + 1;
56 }
57 }
58
59 /*------------------------------------------------------------------------------------------------------------*/
60
61 fprintf(stdout, "%s - %s:%d %s() - ", nyx_log_level_to_str(level), file, line, func);
62
63 /*------------------------------------------------------------------------------------------------------------*/
64
65 va_list ap;
66 va_start(ap, fmt);
67 vfprintf(stdout, fmt, ap);
68 va_end(ap);
69
70 /*------------------------------------------------------------------------------------------------------------*/
71
72 fputc('\r', stdout);
73 fputc('\n', stdout);
74
75 /*------------------------------------------------------------------------------------------------------------*/
76
77 fflush(stdout);
78
79 /*------------------------------------------------------------------------------------------------------------*/
80 }
81
82 if(level == NYX_LOG_LEVEL_FATAL)
83 {
84 exit(1);
85 }
86}
87
88/*--------------------------------------------------------------------------------------------------------------------*/
89/* INDI, MQTT & NSS */
90/*--------------------------------------------------------------------------------------------------------------------*/
91
92void internal_indi_pub(const nyx_node_t *node, nyx_str_t message)
93{
94 if(node->stack->indi_connection != NULL)
95 {
96 if(!mg_send(node->stack->indi_connection, message.buf, message.len))
97 {
98 NYX_LOG_ERROR("Cannot send message to INDI");
99 }
100 }
101}
102
103/*--------------------------------------------------------------------------------------------------------------------*/
104
105void internal_mqtt_sub(const nyx_node_t *node, nyx_str_t topic, int qos)
106{
107 if(node->stack->mqtt_connection != NULL)
108 {
109 struct mg_mqtt_opts opts = {0};
110
111 opts.topic = topic;
113 opts.qos = (uint8_t) qos;
114
115 mg_mqtt_sub(node->stack->mqtt_connection, &opts);
116 }
117}
118
119/*--------------------------------------------------------------------------------------------------------------------*/
120
121void internal_mqtt_pub(const nyx_node_t *node, nyx_str_t topic, nyx_str_t message, int qos)
122{
123 if(node->stack->mqtt_connection != NULL)
124 {
125 struct mg_mqtt_opts opts = {0};
126
127 opts.topic = topic;
128 opts.message = message;
129 opts.qos = (uint8_t) qos;
130
131 mg_mqtt_pub(node->stack->mqtt_connection, &opts);
132 }
133}
134
135/*--------------------------------------------------------------------------------------------------------------------*/
136
137void internal_stream_pub(const nyx_node_t *node, nyx_str_t message)
138{
139 if(node != NULL && node->stack->stream_connection != NULL)
140 {
141 if(!mg_send(node->stack->stream_connection, message.buf, message.len))
142 {
143 NYX_LOG_ERROR("Cannot send message to Nyx-Stream");
144 }
145 }
146}
147
148/*--------------------------------------------------------------------------------------------------------------------*/
149/* STACK */
150/*--------------------------------------------------------------------------------------------------------------------*/
151
152static void _indi_handler(struct mg_connection *connection, int ev, void *ev_data)
153{
154 nyx_node_t *node = connection->fn_data;
155
156 if(ev == MG_EV_OPEN)
157 {
158 NYX_LOG_INFO("%lu INDI OPEN", connection->id);
159
160 node->stack->indi_connection = connection;
161 }
162 else if(ev == MG_EV_CLOSE)
163 {
164 NYX_LOG_INFO("%lu INDI CLOSE", connection->id);
165
166 node->stack->indi_connection = NULL;
167 }
168 else if(ev == MG_EV_ERROR)
169 {
170 NYX_LOG_ERROR("%lu INDI ERROR %s", connection->id, (STR_t) ev_data);
171 }
172 else if(ev == MG_EV_READ)
173 {
174 size_t consumed = node->tcp_handler(node, NYX_NODE_EVENT_MSG, NYX_STR_S(connection->recv.buf, connection->recv.len));
175
176 if(consumed > connection->recv.len)
177 {
178 consumed = connection->recv.len;
179 }
180
181 mg_iobuf_del(
182 &connection->recv,
183 0x0000000000,
184 consumed
185 );
186 }
187}
188
189/*--------------------------------------------------------------------------------------------------------------------*/
190
191static void _mqtt_handler(struct mg_connection *connection, int ev, void *ev_data)
192{
193 nyx_node_t *node = connection->fn_data;
194
195 if(ev == MG_EV_OPEN)
196 {
197 NYX_LOG_INFO("%lu MQTT OPEN 1/2", connection->id);
198
199 node->stack->mqtt_connection = connection;
200 }
201 else if(ev == MG_EV_CLOSE)
202 {
203 NYX_LOG_INFO("%lu MQTT CLOSE", connection->id);
204
205 node->stack->mqtt_connection = NULL;
206 }
207 else if(ev == MG_EV_ERROR)
208 {
209 NYX_LOG_ERROR("%lu MQTT ERROR %s", connection->id, (STR_t) ev_data);
210 }
211 else if(ev == MG_EV_MQTT_OPEN)
212 {
213 NYX_LOG_INFO("%lu MQTT OPEN 2/2", connection->id);
214
215 node->mqtt_handler(
216 node,
217 NYX_NODE_EVENT_OPEN,
218 node->node_id,
219 node->node_id
220 );
221 }
222 else if(ev == MG_EV_MQTT_MSG)
223 {
224 const struct mg_mqtt_message *message = ev_data;
225
226 node->mqtt_handler(
227 node,
228 NYX_NODE_EVENT_MSG,
229 message->topic,
230 message->data
231 );
232 }
233}
234
235/*--------------------------------------------------------------------------------------------------------------------*/
236
237static void _stream_handler(struct mg_connection *connection, int ev, void *ev_data)
238{
239 nyx_node_t *node = connection->fn_data;
240
241 if(ev == MG_EV_OPEN)
242 {
243 NYX_LOG_INFO("%lu STREAM OPEN", connection->id);
244
245 node->stack->stream_connection = connection;
246 }
247 else if(ev == MG_EV_CLOSE)
248 {
249 NYX_LOG_INFO("%lu STREAM CLOSE", connection->id);
250
251 node->stack->stream_connection = NULL;
252 }
253 else if(ev == MG_EV_ERROR)
254 {
255 NYX_LOG_ERROR("%lu STREAM ERROR %s", connection->id, (STR_t) ev_data);
256 }
257 else if(ev == MG_EV_READ)
258 {
259 mg_iobuf_del(
260 &connection->recv,
261 0x0000000000000000,
262 connection->recv.len
263 );
264 }
265}
266
267/*--------------------------------------------------------------------------------------------------------------------*/
268
269static void _retry_timer_handler(void *arg)
270{
271 nyx_node_t *node = arg;
272
273 nyx_stack_t *stack = node->stack;
274
275 /*----------------------------------------------------------------------------------------------------------------*/
276 /* INDI */
277 /*----------------------------------------------------------------------------------------------------------------*/
278
279 if(stack->indi_connection == NULL && node->indi_url != NULL && node->indi_url[0] != '\0')
280 {
281 stack->indi_connection = mg_listen(
282 &stack->mgr,
283 node->indi_url,
284 _indi_handler,
285 node
286 );
287
288 if(stack->indi_connection != NULL)
289 {
290 NYX_LOG_INFO("INDI support is enabled");
291 }
292 }
293
294 /*----------------------------------------------------------------------------------------------------------------*/
295 /* MQTT */
296 /*----------------------------------------------------------------------------------------------------------------*/
297
298 if(stack->mqtt_connection == NULL && node->mqtt_url != NULL && node->mqtt_url[0] != '\0')
299 {
300 stack->mqtt_connection = mg_mqtt_connect(
301 &stack->mgr,
302 node->mqtt_url,
303 &stack->mqtt_opts,
304 _mqtt_handler,
305 node
306 );
307
308 if(stack->mqtt_connection != NULL)
309 {
310 NYX_LOG_INFO("MQTT support is enabled");
311 }
312 }
313
314 /*----------------------------------------------------------------------------------------------------------------*/
315 /* STREAM */
316 /*----------------------------------------------------------------------------------------------------------------*/
317
318 if(stack->stream_connection == NULL && node->nss_url != NULL && node->nss_url[0] != '\0')
319 {
320 stack->stream_connection = mg_connect(
321 &stack->mgr,
322 node->nss_url,
323 _stream_handler,
324 node
325 );
326
327 if(stack->stream_connection != NULL)
328 {
329 NYX_LOG_INFO("Nyx-Stream support is enabled");
330 }
331 }
332
333 /*----------------------------------------------------------------------------------------------------------------*/
334}
335
336/*--------------------------------------------------------------------------------------------------------------------*/
337
338static void _ping_timer_handler(void *arg)
339{
340 nyx_node_ping(arg);
341}
342
343/*--------------------------------------------------------------------------------------------------------------------*/
344
345void internal_stack_initialize(nyx_node_t *node, uint32_t retry_ms)
346{
347 /*----------------------------------------------------------------------------------------------------------------*/
348
349 struct nyx_stack_s *stack = node->stack = nyx_memory_alloc(sizeof(struct nyx_stack_s));
350
351 memset(stack, 0x00, sizeof(struct nyx_stack_s));
352
353 /*----------------------------------------------------------------------------------------------------------------*/
354
355 stack->mqtt_opts.user = mg_str(node->mqtt_username);
356 stack->mqtt_opts.pass = mg_str(node->mqtt_password);
357
358 stack->mqtt_opts.client_id = node->node_id;
359
360 stack->mqtt_opts.version = 0x04;
361 stack->mqtt_opts.clean = true;
362
363 /*----------------------------------------------------------------------------------------------------------------*/
364
365 mg_mgr_init(&stack->mgr);
366
367 /*----------------------------------------------------------------------------------------------------------------*/
368
369 if(node->mqtt_url != NULL && node->mqtt_url[0] != '\0')
370 {
371 nyx_node_add_timer(node, NYX_PING_MS, _ping_timer_handler, node);
372 }
373
374 nyx_node_add_timer(node, retry_ms, _retry_timer_handler, node);
375
376 /*----------------------------------------------------------------------------------------------------------------*/
377}
378
379/*--------------------------------------------------------------------------------------------------------------------*/
380
381void internal_stack_finalize(const nyx_node_t *node)
382{
383 mg_mgr_free(&node->stack->mgr);
384
385 nyx_memory_free(node->stack);
386}
387
388/*--------------------------------------------------------------------------------------------------------------------*/
389
390void nyx_node_add_timer(const nyx_node_t *node, uint32_t interval_ms, void(* callback)(void *), void *arg)
391{
392 mg_timer_add(&node->stack->mgr, interval_ms, MG_TIMER_REPEAT | MG_TIMER_RUN_NOW, callback, arg);
393}
394
395/*--------------------------------------------------------------------------------------------------------------------*/
396
397void nyx_node_poll(const nyx_node_t *node, uint32_t timeout_ms)
398{
399 mg_mgr_poll(&node->stack->mgr, timeout_ms == 0 ? (int) timeout_ms : 10);
400}
401
402/*--------------------------------------------------------------------------------------------------------------------*/
403#endif
404/*--------------------------------------------------------------------------------------------------------------------*/
#define NYX_LOG_INFO(fmt,...)
Logs an info message.
Definition nyx_node.h:229
#define NYX_LOG_ERROR(fmt,...)
Logs an error message.
Definition nyx_node.h:218
enum nyx_log_level_e nyx_log_level_t
Nyx log levels.
#define STR_t
Alias for const char *.
Definition nyx_node.h:71
__NYX_NULLABLE__ buff_t nyx_memory_alloc(__NYX_ZEROABLE__ size_t size)
Similar to libc malloc except that a memory overflow causes the node to stop.
__NYX_ZEROABLE__ size_t nyx_memory_free(__NYX_NULLABLE__ buff_t buff)
Similar to libc free except that it returns the amount of memory freed.
Opaque struct describing a Nyx node.