Implement TCP Chat application.

Fix TCP Socket receive buffer read and write position miscalculations.
Fix TCP Socket send window endianness.
Implement non-blocking TCP Sockets, alter timeout logic.
Move network applications to Programs folder.
Fix TCP Socket Accept using wrong timeout value.
Fix TCP Socket Accept not removing and freeing pending connection requests upon connect failure.
This commit is contained in:
TomAwezome 2021-05-14 04:08:38 -04:00
parent 6f6c8df2e9
commit cf7ad1a5a7
7 changed files with 413 additions and 24 deletions

View file

@ -21,7 +21,7 @@
#include "C:/Home/Net/TCP/TCP.HH"
#include "C:/Home/Net/TCP/TCPTree"
#include "C:/Home/Net/TCP/TCP.CC"
#include "C:/Home/Net/TCP/TCPHandler.CC"
#include "C:/Home/Net/TCP/TCPHandler"
#include "C:/Home/Net/DNS"

View file

@ -0,0 +1,135 @@
CTask *chat_display_task = NULL;
CTask *chat_message_task = NULL;
CTCPSocket *tcp = TCPSocket(AF_INET);
U0 ChatDisplayTask(I64)
{ // display received messages.
DocTermNew;
DocCursor;
DocPrint(, "$$WW,1$$");
while (TaskValidate(chat_message_task))
{
Refresh;
}
}
U0 ChatMessageTask(I64)
{ // take in text.
U8 *message;
DocTermNew;
DocPrint(, "$$WW,1$$");
while (message = StrGet("> ",, SGF_SHIFT_ESC_EXIT))
{
DocBottom(chat_display_task->put_doc);
DocPrint(chat_display_task->put_doc,
"$$BG,BLUE$$$$BLACK$$<local>$$FG$$$$BG$$ %s\n", message);
TCPSocketSend(tcp, message, StrLen(message));
DocClear;
DocPrint(, "$$WW,1$$");
}
}
U0 ChatInit()
{
chat_message_task = Spawn(&ChatMessageTask, NULL, "TCP Chat Message");
chat_display_task = Spawn(&ChatDisplayTask, NULL, "TCP Chat");
chat_message_task->win_inhibit = WIG_USER_TASK_DEFAULT;
chat_display_task->win_inhibit = WIG_USER_TASK_DEFAULT;
LBts(&chat_message_task->display_flags, DISPLAYf_SHOW);
LBts(&chat_display_task->display_flags, DISPLAYf_SHOW);
WinFocus(chat_display_task);
WinFocus(chat_message_task);
chat_display_task->win_top = 2;
chat_display_task->win_bottom = TEXT_ROWS / 3;
chat_display_task->win_left = TEXT_COLS / 3;
chat_display_task->win_right = TEXT_COLS / 3 * 2;
chat_message_task->win_top = chat_display_task->win_bottom + 2;
chat_message_task->win_bottom = chat_message_task->win_top + 3;
chat_message_task->win_left = chat_display_task->win_left;
chat_message_task->win_right = chat_display_task->win_right;
}
U0 Chat()
{
CSocketAddressIPV4 socket_addr;
CIPV4Address ip_addr;
U8 *ip_string = StrGet("Server IPV4: ");
U8 *port_string = StrGet("Server Port: ");
I64 port = Str2I64(port_string);
U8 *buffer[ETHERNET_FRAME_SIZE];
I64 receive_val;
while (PresentationToNetwork(AF_INET, ip_string, &ip_addr) == -1)
{
"\nERROR: Bad IP entered. Retry.\n";
ip_string = StrGet("Server IPV4: ");
}
Free(ip_string);
Free(port_string);
socket_addr.port = EndianU16(port);
socket_addr.family = AF_INET;
socket_addr.address.address = ip_addr.address;
tcp->timeout = TCP_TIMEOUT; // use normal timeout for connect()
"\nConnecting to server...\n";
if (TCPSocketConnect(tcp, &socket_addr) != 0)
{
"\nFailed to connect to server.\n";
TCPSocketClose(tcp);
return;
}
else
"\nSuccessfully connected.\n";
ChatInit;
tcp->timeout = 0;
while (TaskValidate(chat_message_task))
{
if ((receive_val = TCPSocketReceive(tcp, buffer, ETHERNET_FRAME_SIZE)) != -1)
{
if (receive_val == 0)
{
"\nServer closed the connection.\n";
Kill(chat_message_task);
break;
}
else if (receive_val > 0)
{
DocBottom(chat_display_task->put_doc);
DocPrint(chat_display_task->put_doc,
"%s\n", buffer);
MemSet(buffer, 0, ETHERNET_FRAME_SIZE);
}
}
Refresh;
}
tcp->timeout = TCP_TIMEOUT;
"\nClosing connected socket...\n";
if (TCPSocketClose(tcp) == 0)
"\nSocket closed.\n";
else
"\nSocket failed to close.\n";
}
Chat;

View file

@ -0,0 +1,226 @@
CTCPSocket *tcp = TCPSocket(AF_INET);
CTCPTreeQueue *connections = CAlloc(sizeof(CTCPTreeQueue)); // head
U8 *buffer = CAlloc(ETHERNET_FRAME_SIZE);
QueueInit(connections);
U0 ChatServerKill()
{
CTCPTreeQueue *conn = connections->next;
CTCPTreeQueue *next_conn;
while (conn != connections)
{
"\nClosing socket @ 0x%0X\n", conn->socket;
TCPSocketClose(conn->socket);
next_conn = conn->next;
Free(conn);
conn = next_conn;
}
"\nClosing listening socket.\n";
TCPSocketClose(tcp);
return;
}
U0 ChatServerBroadcast(CTCPSocket *tcp_socket, I64 length)
{ // Broadcast length bytes of msg in buffer to all but original socket.
CTCPTreeQueue *conn = connections->next;
CTCPSocket *dest_socket;
U8 *ip_string;
CIPV4Address addr;
U8 *message;
U8 *message_prefix;
while (conn != connections)
{
dest_socket = conn->socket;
if (dest_socket != tcp_socket)
{
addr.address = EndianU32(dest_socket->destination_address(CSocketAddressIPV4).address.address);
ip_string = NetworkToPresentation(AF_INET, &addr);
// TODO: is NetworkToPresentation backwards? or, do socket addrs store BE or LE ?
"\nBroacasting msg to %s.\n", ip_string;
addr.address = EndianU32(tcp_socket->destination_address(CSocketAddressIPV4).address.address);
ip_string = NetworkToPresentation(AF_INET, &addr);
// TODO: is NetworkToPresentation backwards? or, do socket addrs store BE or LE ?
message_prefix = MStrPrint("$$BG,PURPLE$$$$BLACK$$<%s>$$FG$$$$BG$$ %%0%dts", ip_string, length);
message = MStrPrint(message_prefix, buffer);
TCPSocketSend(dest_socket, message, StrLen(message));
Free(message);
Free(message_prefix);
Free(ip_string);
}
conn = conn->next;
}
return;
}
U0 ChatServerBroadcastDisconnect()
{
CTCPTreeQueue *conn = connections->next;
CTCPSocket *conn_socket;
U8 *message = MStrPrint("$$BG,LTGRAY$$$$DKGRAY$$Client disconnected. Connected clients: %d$$FG$$$$BG$$",
QueueCount(connections));
while (conn != connections)
{
conn_socket = conn->socket;
TCPSocketSend(conn_socket, message, StrLen(message));
conn = conn->next;
}
Free(message);
}
U0 ChatServerReceive()
{
CTCPTreeQueue *conn = connections->next;
CTCPTreeQueue *next_conn;
CTCPSocket *socket;
I64 message_len;
U8 *ip_string;
CIPV4Address addr;
while (conn != connections)
{
socket = conn->socket;
message_len = TCPSocketReceive(socket, buffer, ETHERNET_FRAME_SIZE);
if (message_len == 0)
{
"\nClosing a connection.\n";
socket->timeout = TCP_TIMEOUT;
TCPSocketClose(socket);
next_conn = conn->next;
QueueRemove(conn);
Free(conn);
conn = next_conn;
ChatServerBroadcastDisconnect();
}
else if (message_len > 0)
{
addr.address = EndianU32(socket->destination_address(CSocketAddressIPV4).address.address);
ip_string = NetworkToPresentation(AF_INET, &addr);
// TODO: is NetworkToPresentation backwards? or, do socket addrs store BE or LE ?
"\nBroadcasting %d byte msg from %s: %Q\n", message_len, ip_string, buffer;
//ClassRep(socket);
ChatServerBroadcast(socket, message_len);
MemSet(buffer, 0, ETHERNET_FRAME_SIZE);
conn = conn->next;
}
else
{
//"\nReceived -1 [error], trying next connection.\n";
conn = conn->next;
}
}
return;
}
U0 ChatServer()
{
CSocketAddressIPV4 socket_addr;
U8 *port_string = StrGet("Server Port: ");
I64 port = Str2I64(port_string);
CTCPSocket *new_socket;
CTCPTreeQueue *new_conn;
U8 *join_msg;
CTCPTreeQueue *conn;
CTCPSocket *conn_socket;
Free(port_string);
socket_addr.port = EndianU16(port);
socket_addr.family = AF_INET;
socket_addr.address.address = INADDR_ANY;
tcp->timeout = 0.3 * JIFFY_FREQ;
"\nTrying to bind socket.\n";
if (TCPSocketBind(tcp, &socket_addr) == 0)
"\nSocket bound.\n";
else
{
"\nFailed to bind socket.\n";
ChatServerKill;
return;
}
"\nTrying to listen on socket.\n";
if (TCPSocketListen(tcp, 5) == 0)
"\nSocket now listening.\n";
else
{
"\nFailed to listen on socket.\n";
ChatServerKill;
return;
}
while (CharScan != CH_SHIFT_ESC)
{
new_socket = TCPSocketAccept(tcp);
if (new_socket > 0)
{
"\nNew connection.\n";
new_conn = CAlloc(sizeof(CTCPTreeQueue));
new_conn->socket = new_socket;
new_socket->timeout = 0; // Set new connection non-blocking.
join_msg = MStrPrint("$$BG,LTGRAY$$$$DKGRAY$$Connected clients: %d$$FG$$$$BG$$", QueueCount(connections) + 1);
TCPSocketSend(new_socket, join_msg, StrLen(join_msg));
Free(join_msg);
join_msg = MStrPrint("$$BG,LTGRAY$$$$DKGRAY$$New connection. Connected clients: %d$$FG$$$$BG$$",
QueueCount(connections) + 1);
conn = connections->next;
while (conn != connections)
{
"\nNotifying clients of new connection.\n";
conn_socket = conn->socket;
TCPSocketSend(conn_socket, join_msg, StrLen(join_msg));
conn = conn->next;
}
Free(join_msg);
QueueInsertRev(new_conn, connections);
}
ChatServerReceive; // Receive & Broadcast
Sleep(50);
}
ChatServerKill;
}
ChatServer;

View file

@ -735,7 +735,6 @@ I64 TCPSocketBind(CTCPSocket *tcp_socket, CSocketAddressStorage *address)
switch (address->family)
{
case AF_INET:
// TODO: will any INADDR_ANY sockets bound at the port break this?
if (TCPTreeNodeQueueIPV4Find(ipv4_destination->address.address, temp_node, TRUE))
{
NetErr("TCP SOCKET BIND: Address already in Bound Socket Tree !");
@ -1142,10 +1141,10 @@ CTCPSocket *TCPSocketAccept(CTCPSocket *tcp_socket)
if (tcp_socket->state != TCP_STATE_LISTEN)
{
NetErr("TCP SOCKET LISTEN: TCP Socket must be in LISTEN state.");
return -1;
return NULL;
}
timeout = counts.jiffies + new_socket->timeout * JIFFY_FREQ / 1000;
timeout = counts.jiffies + tcp_socket->timeout * JIFFY_FREQ / 1000;
while ((pending = tcp_socket->accept_queue->next) == tcp_socket->accept_queue)
{
if (counts.jiffies > timeout)
@ -1155,6 +1154,8 @@ CTCPSocket *TCPSocketAccept(CTCPSocket *tcp_socket)
// Yield;
}
QueueRemove(pending); // whether successful accept() or not, remove pending connection.
// TODO: rework accept logic to handle IPV6 addresses
new_socket = TCPSocket(AF_INET);
@ -1173,7 +1174,11 @@ CTCPSocket *TCPSocketAccept(CTCPSocket *tcp_socket)
NetDebug("TCP SOCKET ACCEPT: Attempting to Bind to pending connection %0X @ src/dst ports %d,%d.",
pending->ipv4_address, ipv4_address.port, pending->port);
TCPSocketBind(new_socket, &ipv4_address);
if (TCPSocketBind(new_socket, &ipv4_address) == -1)
{
Free(pending);
return NULL;
}
temp_addr = &new_socket->destination_address;
@ -1202,9 +1207,12 @@ CTCPSocket *TCPSocketAccept(CTCPSocket *tcp_socket)
if (new_socket->state != TCP_STATE_ESTABLISHED)
{
TCPSocketClose(new_socket);
Free(pending);
return NULL;
}
Free(pending);
new_socket->socket->state = SOCKET_STATE_OPEN;
return new_socket;
@ -1229,14 +1237,19 @@ I64 TCPSocketReceive(CTCPSocket *tcp_socket, U8 *buffer, I64 length)
while ((tcp_socket->state == TCP_STATE_ESTABLISHED || tcp_socket->state == TCP_STATE_FIN_WAIT1) &&
tcp_socket->read_position == tcp_socket->write_position)
{
if (counts.jiffies > timeout)
{
NetErr("TCP SOCKET RECEIVE: Timed out.");
return -1;
}
TCPCheckACKQueue(tcp_socket);
Sleep(1);
if (counts.jiffies > timeout)
{
// if (tcp_socket->timeout != 0) // Don't flood NetLog on non-blocking receives.
// NetErr("TCP SOCKET RECEIVE: Timed out.");
// return -1;
break;
}
}
// Shrine has TODO: Should still be able to receive in closing states ...
@ -1244,6 +1257,16 @@ I64 TCPSocketReceive(CTCPSocket *tcp_socket, U8 *buffer, I64 length)
tcp_socket->read_position == tcp_socket->write_position || length == 0)
return 0;
if (counts.jiffies > timeout)
{
if (tcp_socket->timeout != 0) // Don't flood NetLog on non-blocking receives.
NetErr("TCP SOCKET RECEIVE: Timed out.");
return -1;
}
read_position = tcp_socket->read_position;
write_position = tcp_socket->write_position;
@ -1257,15 +1280,16 @@ I64 TCPSocketReceive(CTCPSocket *tcp_socket, U8 *buffer, I64 length)
MemCopy(buffer, tcp_socket->receive_buffer + read_position, step);
buffer += step;
length -= step;
read_position = (read_position + step) & (tcp_socket->receive_buffer_size - 1);
// read_position = (read_position + step) & (tcp_socket->receive_buffer_size - 1);
read_position = (read_position + step) % tcp_socket->receive_buffer_size;
read_total += step;
}
if (length)
if (length > 0)
{
step = write_position - read_position;
if (step < length)
if (step > length)
step = length;
MemCopy(buffer, tcp_socket->receive_buffer + read_position, step);
@ -1289,18 +1313,12 @@ I64 TCPSocketSend(CTCPSocket *tcp_socket, U8 *buffer, I64 length)
if (!SocketSend(tcp_socket->socket))
{
NetErr("TCP SOCKET SEND: Failed, Socket state-machine must be in OPEN state.");
return NULL;
return -1;
}
while ((tcp_socket->state == TCP_STATE_ESTABLISHED || tcp_socket->state == TCP_STATE_CLOSE_WAIT) &&
length)
{
if (counts.jiffies > timeout)
{
NetErr("TCP SOCKET SEND: Timed out.");
return -1;
}
send_length = (tcp_socket->first_unacked_seq + tcp_socket->send_window - tcp_socket->next_send_seq_num) & 0xFFFFFFFF;
// Shrine has TODO: Keep trying, tie to timeout: RFC 793 "Managing The Window"
@ -1323,7 +1341,7 @@ I64 TCPSocketSend(CTCPSocket *tcp_socket, U8 *buffer, I64 length)
if (send_length > tcp_socket->max_segment_size)
send_length = tcp_socket->max_segment_size;
NetDebug("TCP SOCKET SEND: Trying TCPSendData().");
NetDebug("TCP SOCKET SEND: Trying TCPSendData() of %d bytes.", send_length);
if (TCPSendData(tcp_socket, TCPF_ACK, buffer, send_length) < 0)
{ // Stall until outgoing data acknowledged.
if (sent_total > 0)
@ -1342,6 +1360,14 @@ I64 TCPSocketSend(CTCPSocket *tcp_socket, U8 *buffer, I64 length)
sent_total += send_length;
}
}
if (counts.jiffies > timeout)
{
if (tcp_socket->timeout != 0) // Don't flood NetLog on non-blocking sends.
NetErr("TCP SOCKET SEND: Timed out.");
break;
}
}
return sent_total;

View file

@ -5,7 +5,8 @@ Bool TCPHandleValidSEQ(CTCPSocket *tcp_socket, CTCPHeader *header, U32 segment_s
I64 next_position;
I64 i;
tcp_socket->send_window = header->window_size;
// tcp_socket->send_window = header->window_size;
tcp_socket->send_window = EndianU16(header->window_size);
// Shrine doesn't use EndianU16 (ntohs)? are these all being stored network byte order? ...
switch (tcp_socket->state)
@ -28,7 +29,8 @@ Bool TCPHandleValidSEQ(CTCPSocket *tcp_socket, CTCPHeader *header, U32 segment_s
for (i = 0; i < length; i++)
{
next_position = (write_position + 1) & (tcp_socket->receive_buffer_size - 1);
// next_position = (write_position + 1) & (tcp_socket->receive_buffer_size - 1);
next_position = (write_position + 1) % tcp_socket->receive_buffer_size;
if (next_position == tcp_socket->read_position)
break; // ...?