Client/Server data setup working!

This commit is contained in:
Scott Duensing 2022-01-13 20:00:01 -06:00
parent d7dcd43c5f
commit 5a840ab1bf
8 changed files with 96 additions and 39 deletions

View file

@ -260,12 +260,16 @@ int main(int argc, char *argv[]) {
sh_new_strdup(__runtimeData.integers); sh_new_strdup(__runtimeData.integers);
sh_new_strdup(__runtimeData.strings); sh_new_strdup(__runtimeData.strings);
// ***TODO*** Load tables from disk cache
//taskCreate(taskComDebugLoop, NULL); //taskCreate(taskComDebugLoop, NULL);
taskCreate(taskWelcome, NULL); taskCreate(taskWelcome, NULL);
taskCreate(taskGuiEventLoop, NULL); taskCreate(taskGuiEventLoop, NULL);
taskRun(); taskRun();
// ***TODO*** Write tables from disk cache
// Free integer table. // Free integer table.
if (__runtimeData.integers) { if (__runtimeData.integers) {
while (shlen(__runtimeData.integers) > 0) { while (shlen(__runtimeData.integers) > 0) {

View file

@ -85,39 +85,42 @@ void taskNetwork(void *data) {
do { do {
//***TODO*** Detect disconnection. Maybe have callbacks registered that can notify tasks? // ***TODO*** Detect disconnection. Maybe have callbacks registered that can notify tasks?
// Read pending bytes. // Read pending bytes.
r = comRead(__configData.serialCom - 1, buffer, 1024); r = comRead(__configData.serialCom - 1, buffer, 1024);
if (r > 0) { // New data or not, process anything in the queue.
// Decode them into packets. if (packetDecode(__packetThreadData, &decoded, buffer, r)) {
if (packetDecode(__packetThreadData, &decoded, buffer, r)) { // Is this a PONG? If so, ignore it.
// Is this a PONG? If so, ignore it. if (decoded.packetType == PACKET_TYPE_PONG) {
if (decoded.packetType == PACKET_TYPE_PONG) continue; packetDecodeDataStaticDestroy(&decoded);
continue;
// Copy the packet out.
NEW(PacketDecodeDataT, packet);
packet->channel = decoded.channel;
packet->length = decoded.length;
packet->packetType = decoded.packetType;
packet->data = (char *)malloc(decoded.length);
memcpy(packet->data, decoded.data, decoded.length);
// Is there a list of packets for this channel already?
r = hmgeti(_packets, packet->channel);
if (r < 0) {
// No. Create dynamic array for this channel.
hmput(_packets, packet->channel, NULL);
r = hmgeti(_packets, packet->channel);
}
// Add new packet to existing list.
arrput(_packets[r].value, packet);
} }
} else {
// No bytes pending, yield to UI. // Copy the packet out.
taskYield(); NEW(PacketDecodeDataT, packet);
packet->channel = decoded.channel;
packet->length = decoded.length;
packet->packetType = decoded.packetType;
packet->data = (char *)malloc(decoded.length);
memcpy(packet->data, decoded.data, decoded.length);
// Is there a list of packets for this channel already?
r = hmgeti(_packets, packet->channel);
if (r < 0) {
// No. Create dynamic array for this channel.
hmput(_packets, packet->channel, NULL);
r = hmgeti(_packets, packet->channel);
}
// Add new packet to existing list.
arrput(_packets[r].value, packet);
packetDecodeDataStaticDestroy(&decoded);
} }
// Yield to UI.
taskYield();
// Send a ping to the server every 5 seconds, because, ping. // Send a ping to the server every 5 seconds, because, ping.
if (__timerSecondTick) { if (__timerSecondTick) {
pingTimeout++; pingTimeout++;

View file

@ -136,10 +136,6 @@ static void taskConnectClick(void *data) {
return; return;
} }
// Connected! Show icon.
widgetVisibleSet(W(_picConnect), 1);
taskYield();
// Start packet handler and negotiate encryption. // Start packet handler and negotiate encryption.
taskCreate(taskNetwork, NULL); taskCreate(taskNetwork, NULL);
packetEncryptionSetup(__packetThreadData); packetEncryptionSetup(__packetThreadData);
@ -155,6 +151,8 @@ static void taskConnectClick(void *data) {
return; return;
} }
// ***TODO*** Should probably cleanly handle a full server here with some kind of SERVER_FULL packet.
// Wait for version and table packets to arrive. // Wait for version and table packets to arrive.
timeout = 10; timeout = 10;
do { do {
@ -170,6 +168,7 @@ static void taskConnectClick(void *data) {
break; break;
case PACKET_TYPE_PROCEED: case PACKET_TYPE_PROCEED:
logWrite("Received PACKET_TYPE_PROCEED\n");
waiting = 0; waiting = 0;
break; break;
@ -217,6 +216,14 @@ static void taskConnectClick(void *data) {
return; return;
} }
// Connected! Show icon.
widgetVisibleSet(W(_picConnect), 1);
timeout = 6; // Roughly 1.5 seconds.
while (timeout > 0) {
taskYield();
if (__timerQuarterSecondTick) timeout--;
}
// Switch to Login window. // Switch to Login window.
guiDelete(D(_winWelcome)); guiDelete(D(_winWelcome));
taskCreate(taskLogin, NULL); taskCreate(taskLogin, NULL);

View file

@ -50,7 +50,7 @@ static uint8_t clientDequeuePacket(ClientThreadT *client) {
// New data or not, process anything in the queue. // New data or not, process anything in the queue.
if (packetDecode(client->packetThreadData, &decode, data, length)) { if (packetDecode(client->packetThreadData, &decode, data, length)) {
clientProcessPacket(client, &decode); clientProcessPacket(client, &decode);
DEL(decode.data); if (decode.data) DEL(decode.data);
if (packet) { if (packet) {
DEL(packet->data); DEL(packet->data);
DEL(packet); DEL(packet);
@ -257,8 +257,14 @@ void *clientThread(void *data) {
} }
// Clean up client data on the way out. // Clean up client data on the way out.
while (arrlen(client->packetQueue) > 0) {
if (client->packetQueue[0]->data) DEL(client->packetQueue[0]->data);
arrdel(client->packetQueue, 0);
}
arrfree(client->packetQueue); arrfree(client->packetQueue);
pthread_mutex_destroy(&client->packetQueueMutex); pthread_mutex_destroy(&client->packetQueueMutex);
packetThreadDataDestroy(&client->packetThreadData);
DEL(client);
pthread_exit(NULL); pthread_exit(NULL);
} }

View file

@ -64,7 +64,6 @@ void consoleRun(void) {
uint8_t commandOk = 0; uint8_t commandOk = 0;
struct timespec remaining = { 0, 0 }; struct timespec remaining = { 0, 0 };
struct timespec sleepTime = { 0, 1000000000/4 }; // 1/4th second struct timespec sleepTime = { 0, 1000000000/4 }; // 1/4th second
size_t i = 0;
if (pthread_mutex_init(&_messageQueueMutex, NULL)) utilDie("Unable to create console message queue mutex.\n"); if (pthread_mutex_init(&_messageQueueMutex, NULL)) utilDie("Unable to create console message queue mutex.\n");
@ -168,8 +167,9 @@ void consoleRun(void) {
} }
// Anything left in the message queue? // Anything left in the message queue?
for (i=0; i<arrlenu(_consoleMessageQueue); i++) { while (arrlen(_consoleMessageQueue) > 0) {
free(_consoleMessageQueue[i]); DEL(_consoleMessageQueue[0]);
arrdel(_consoleMessageQueue, 0);
} }
arrfree(_consoleMessageQueue); arrfree(_consoleMessageQueue);

View file

@ -32,7 +32,7 @@
#include <pthread.h> #include <pthread.h>
// Should be after system headers in this file. // Should be after system headers in this file.
//#define MEMORY_CHECK_ENABLED #define MEMORY_CHECK_ENABLED
#include "memory.h" #include "memory.h"
// Now our headers. // Now our headers.

View file

@ -23,6 +23,24 @@
// Encryption: https://erev0s.com/blog/tiny-aes-cbc-mode-pkcs7-padding-written-c // Encryption: https://erev0s.com/blog/tiny-aes-cbc-mode-pkcs7-padding-written-c
/*
* NOTE:
*
* The reliable transport / packet retry code has been commented out.
* Structly speaking, it isn't needed. Enet handles reliable delivery
* across the internet. The only place there can be corruption is
* between a real serial port and the enet softmodem.
*
* In any case, if the code ends up being re-enabled, the sequence
* numbers have no bounding and will run off the end of the history
* array. Not only that, but due to the limited number of sequence
* numbers available, there needs to be a sending queue added for
* when the server attempts to send more packets than there are
* history slots.
*
*/
#include "packet.h" #include "packet.h"
#include "primes.h" #include "primes.h"
@ -65,7 +83,7 @@ static uint8_t packetCRC(char *data, uint16_t length, uint8_t startAt) {
uint8_t packetDecode(PacketThreadDataT *threadData, PacketDecodeDataT *decodeData, char *input, uint16_t inputLength) { uint8_t packetDecode(PacketThreadDataT *threadData, PacketDecodeDataT *decodeData, char *input, uint16_t inputLength) {
uint8_t sequence = 0; //uint8_t sequence = 0;
uint16_t unpadded = 0; uint16_t unpadded = 0;
int32_t x = 0; int32_t x = 0;
char c = 0; char c = 0;
@ -107,6 +125,7 @@ uint8_t packetDecode(PacketThreadDataT *threadData, PacketDecodeDataT *decodeDat
// Check CRC. // Check CRC.
if ((uint8_t)threadData->decodeBuffer[threadData->length - 1] != packetCRC(threadData->decodeBuffer, threadData->length - 1, 0)) continue; if ((uint8_t)threadData->decodeBuffer[threadData->length - 1] != packetCRC(threadData->decodeBuffer, threadData->length - 1, 0)) continue;
/*
// Get sequence value. // Get sequence value.
sequence = ((uint8_t)threadData->decodeBuffer[0]) & 0x1f; sequence = ((uint8_t)threadData->decodeBuffer[0]) & 0x1f;
@ -151,6 +170,7 @@ uint8_t packetDecode(PacketThreadDataT *threadData, PacketDecodeDataT *decodeDat
packetSend(threadData, &encoded); packetSend(threadData, &encoded);
continue; continue;
} }
*/
// Fill decoded data fields. // Fill decoded data fields.
decodeData->packetType = threadData->decodeBuffer[1]; decodeData->packetType = threadData->decodeBuffer[1];
@ -241,13 +261,21 @@ uint8_t packetDecode(PacketThreadDataT *threadData, PacketDecodeDataT *decodeDat
void packetDecodeDataDestroy(PacketDecodeDataT **packet) { void packetDecodeDataDestroy(PacketDecodeDataT **packet) {
PacketDecodeDataT *d = *packet; PacketDecodeDataT *d = *packet;
free(d->data); packetDecodeDataStaticDestroy(d);
free(d); free(d);
d = NULL; d = NULL;
*packet = d; *packet = d;
} }
void packetDecodeDataStaticDestroy(PacketDecodeDataT *packet) {
if (packet->data) {
free(packet->data);
packet->data = NULL;
}
}
static uint16_t packetDHCompute(uint32_t a, uint32_t m, uint32_t n) { static uint16_t packetDHCompute(uint32_t a, uint32_t m, uint32_t n) {
uint32_t r = 0; uint32_t r = 0;
uint32_t y = 1; uint32_t y = 1;
@ -282,9 +310,11 @@ uint8_t packetEncode(PacketThreadDataT *threadData, PacketEncodeDataT *data, cha
data->dataPointer = NULL; data->dataPointer = NULL;
data->length = 0; data->length = 0;
/*
if (data->control == PACKET_CONTROL_DAT) { if (data->control == PACKET_CONTROL_DAT) {
data->sequence = threadData->sequence++; data->sequence = threadData->sequence++;
} }
*/
// Make needed header bytes. // Make needed header bytes.
control = (((uint8_t)data->control) << 6) + (data->encrypt << 5) + (data->sequence & 0x1f); control = (((uint8_t)data->control) << 6) + (data->encrypt << 5) + (data->sequence & 0x1f);
@ -387,9 +417,10 @@ void packetSend(PacketThreadDataT *threadData, PacketEncodeDataT *data) {
logWrite("Invalid PACKET_CONTROL!\n\r"); logWrite("Invalid PACKET_CONTROL!\n\r");
} }
/*
// Add to history? // Add to history?
if (data->control == PACKET_CONTROL_DAT) { if (data->control == PACKET_CONTROL_DAT) {
//***TODO*** Must change control to use CONTROL_RTX & fix framing changes // ***TODO*** Must change control to use CONTROL_RTX & fix framing changes
threadData->history[threadData->historyPosition].sequence = data->sequence; threadData->history[threadData->historyPosition].sequence = data->sequence;
threadData->history[threadData->historyPosition].length = data->length; threadData->history[threadData->historyPosition].length = data->length;
memcpy(threadData->history[threadData->historyPosition].data, data->dataPointer, data->length); memcpy(threadData->history[threadData->historyPosition].data, data->dataPointer, data->length);
@ -398,6 +429,7 @@ void packetSend(PacketThreadDataT *threadData, PacketEncodeDataT *data) {
threadData->historyPosition = 0; threadData->historyPosition = 0;
} }
} }
*/
// Mark invalid so caller has to change it. // Mark invalid so caller has to change it.
data->control = PACKET_CONTROL_BAD; data->control = PACKET_CONTROL_BAD;
@ -414,9 +446,11 @@ PacketThreadDataT *packetThreadDataCreate(void *senderData) {
data = (PacketThreadDataT *)malloc(sizeof(PacketThreadDataT)); data = (PacketThreadDataT *)malloc(sizeof(PacketThreadDataT));
if (data) { if (data) {
/*
data->sequence = 0; data->sequence = 0;
data->lastRemoteSequence = 0; data->lastRemoteSequence = 0;
data->historyPosition = 0; data->historyPosition = 0;
*/
data->decodeQueueHead = 0; data->decodeQueueHead = 0;
data->decodeQueueTail = 0; data->decodeQueueTail = 0;
data->newPacket = 1; data->newPacket = 1;

View file

@ -90,10 +90,12 @@ typedef struct PacketHistoryDataS {
typedef struct PacketThreadDataS { typedef struct PacketThreadDataS {
// Internal state per thread for packet processing. // Internal state per thread for packet processing.
/*
uint8_t sequence; uint8_t sequence;
uint8_t lastRemoteSequence; uint8_t lastRemoteSequence;
PacketHistoryDataT history[PACKET_SEQUENCE_MAX]; PacketHistoryDataT history[PACKET_SEQUENCE_MAX];
uint8_t historyPosition; uint8_t historyPosition;
*/
char decodeBuffer[PACKET_MAX]; char decodeBuffer[PACKET_MAX];
char encodeBuffer[PACKET_BUFFER_SIZE]; char encodeBuffer[PACKET_BUFFER_SIZE];
char decodeQueue[PACKET_INPUT_QUEUE_SIZE]; char decodeQueue[PACKET_INPUT_QUEUE_SIZE];
@ -119,6 +121,7 @@ typedef void (*packetSender)(char *data, uint32_t length, void *userData);
uint8_t packetDecode(PacketThreadDataT *threadData, PacketDecodeDataT *decodeData, char *input, uint16_t inputLength); uint8_t packetDecode(PacketThreadDataT *threadData, PacketDecodeDataT *decodeData, char *input, uint16_t inputLength);
void packetDecodeDataDestroy(PacketDecodeDataT **packet); void packetDecodeDataDestroy(PacketDecodeDataT **packet);
void packetDecodeDataStaticDestroy(PacketDecodeDataT *packet);
uint8_t packetEncode(PacketThreadDataT *threadData, PacketEncodeDataT *data, char *input, uint16_t length); uint8_t packetEncode(PacketThreadDataT *threadData, PacketEncodeDataT *data, char *input, uint16_t length);
uint8_t packetEncryptionReady(void); uint8_t packetEncryptionReady(void);
void packetEncryptionSetup(PacketThreadDataT *threadData); void packetEncryptionSetup(PacketThreadDataT *threadData);