14#include "external/mongoose.h"
16#include "../nyx_node_internal.h"
24 struct mg_mqtt_opts mqtt_opts;
26 struct mg_connection *indi_connection;
27 struct mg_connection *mqtt_connection;
28 struct mg_connection *stream_connection;
37 if(level <= nyx_log_level)
43 p = strrchr(file,
'/');
51 p = strrchr(file,
'\\');
61 fprintf(stdout,
"%s - %s:%d %s() - ", nyx_log_level_to_str(level), file, line, func);
67 vfprintf(stdout, fmt, ap);
82 if(level == NYX_LOG_LEVEL_FATAL)
92void internal_indi_pub(
const nyx_node_t *node, nyx_str_t message)
94 if(node->stack->indi_connection != NULL)
96 if(!mg_send(node->stack->indi_connection, message.buf, message.len))
105void internal_mqtt_sub(
const nyx_node_t *node, nyx_str_t topic,
int qos)
107 if(node->stack->mqtt_connection != NULL)
109 struct mg_mqtt_opts opts = {0};
113 opts.qos = (uint8_t) qos;
115 mg_mqtt_sub(node->stack->mqtt_connection, &opts);
121void internal_mqtt_pub(
const nyx_node_t *node, nyx_str_t topic, nyx_str_t message,
int qos)
123 if(node->stack->mqtt_connection != NULL)
125 struct mg_mqtt_opts opts = {0};
128 opts.message = message;
129 opts.qos = (uint8_t) qos;
131 mg_mqtt_pub(node->stack->mqtt_connection, &opts);
137void internal_stream_pub(
const nyx_node_t *node, nyx_str_t message)
139 if(node != NULL && node->stack->stream_connection != NULL)
141 if(!mg_send(node->stack->stream_connection, message.buf, message.len))
152static void _indi_handler(
struct mg_connection *connection,
int ev,
void *ev_data)
160 node->stack->indi_connection = connection;
162 else if(ev == MG_EV_CLOSE)
166 node->stack->indi_connection = NULL;
168 else if(ev == MG_EV_ERROR)
172 else if(ev == MG_EV_READ)
174 size_t consumed = node->tcp_handler(node, NYX_NODE_EVENT_MSG, NYX_STR_S(connection->recv.buf, connection->recv.len));
176 if(consumed > connection->recv.len)
178 consumed = connection->recv.len;
191static void _mqtt_handler(
struct mg_connection *connection,
int ev,
void *ev_data)
199 node->stack->mqtt_connection = connection;
201 else if(ev == MG_EV_CLOSE)
205 node->stack->mqtt_connection = NULL;
207 else if(ev == MG_EV_ERROR)
211 else if(ev == MG_EV_MQTT_OPEN)
222 else if(ev == MG_EV_MQTT_MSG)
224 const struct mg_mqtt_message *message = ev_data;
237static void _stream_handler(
struct mg_connection *connection,
int ev,
void *ev_data)
245 node->stack->stream_connection = connection;
247 else if(ev == MG_EV_CLOSE)
251 node->stack->stream_connection = NULL;
253 else if(ev == MG_EV_ERROR)
257 else if(ev == MG_EV_READ)
269static void _retry_timer_handler(
void *arg)
273 nyx_stack_t *stack = node->stack;
279 if(stack->indi_connection == NULL && node->indi_url != NULL && node->indi_url[0] !=
'\0')
281 stack->indi_connection = mg_listen(
288 if(stack->indi_connection != NULL)
298 if(stack->mqtt_connection == NULL && node->mqtt_url != NULL && node->mqtt_url[0] !=
'\0')
300 stack->mqtt_connection = mg_mqtt_connect(
308 if(stack->mqtt_connection != NULL)
318 if(stack->stream_connection == NULL && node->nss_url != NULL && node->nss_url[0] !=
'\0')
320 stack->stream_connection = mg_connect(
327 if(stack->stream_connection != NULL)
338static void _ping_timer_handler(
void *arg)
345void internal_stack_initialize(
nyx_node_t *node, uint32_t retry_ms)
349 struct nyx_stack_s *stack = node->stack =
nyx_memory_alloc(
sizeof(
struct nyx_stack_s));
351 memset(stack, 0x00,
sizeof(
struct nyx_stack_s));
355 stack->mqtt_opts.user = mg_str(node->mqtt_username);
356 stack->mqtt_opts.pass = mg_str(node->mqtt_password);
358 stack->mqtt_opts.client_id = node->node_id;
360 stack->mqtt_opts.version = 0x04;
361 stack->mqtt_opts.clean =
true;
365 mg_mgr_init(&stack->mgr);
369 if(node->mqtt_url != NULL && node->mqtt_url[0] !=
'\0')
371 nyx_node_add_timer(node, NYX_PING_MS, _ping_timer_handler, node);
374 nyx_node_add_timer(node, retry_ms, _retry_timer_handler, node);
381void internal_stack_finalize(
const nyx_node_t *node)
383 mg_mgr_free(&node->stack->mgr);
390void nyx_node_add_timer(
const nyx_node_t *node, uint32_t interval_ms,
void(* callback)(
void *),
void *arg)
392 mg_timer_add(&node->stack->mgr, interval_ms, MG_TIMER_REPEAT | MG_TIMER_RUN_NOW, callback, arg);
397void nyx_node_poll(
const nyx_node_t *node, uint32_t timeout_ms)
399 mg_mgr_poll(&node->stack->mgr, timeout_ms == 0 ? (
int) timeout_ms : 10);
#define NYX_LOG_INFO(fmt,...)
Logs an info message.
#define NYX_LOG_ERROR(fmt,...)
Logs an error message.
enum nyx_log_level_e nyx_log_level_t
Nyx log levels.
#define STR_t
Alias for const char *.
__NYX_NULLABLE__ buff_t nyx_memory_alloc(__NYX_ZEROABLE__ size_t size)
Similar to libc malloc except that a memory overflow causes the node to stop.
__NYX_ZEROABLE__ size_t nyx_memory_free(__NYX_NULLABLE__ buff_t buff)
Similar to libc free except that it returns the amount of memory freed.
Opaque struct describing a Nyx node.