Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions doc/openapi/components/schemas/config/nodes/socket.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ allOf:
- udp
- ip
- eth
- tcp-client
- tcp-server
default: udp
description: |
Select the network layer which should be used for the socket. Please note that `eth` can only be used locally in a LAN as it contains no routing information for the internet.
Expand Down
2 changes: 2 additions & 0 deletions include/villas/nodes/socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ class NodeCompat;

struct Socket {
int sd; // The socket descriptor
int clt_sd; // TCP client socket descriptor
int verify_source; // Verify the source address of incoming packets against socket::remote.
bool tcp_connected = false; // TCP connection status bit

enum SocketLayer
layer; // The OSI / IP layer which should be used for this socket
Expand Down
2 changes: 1 addition & 1 deletion include/villas/socket_addr.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ union sockaddr_union {
namespace villas {
namespace node {

enum class SocketLayer { ETH, IP, UDP, UNIX };
enum class SocketLayer { ETH, IP, UDP, UNIX, TCP_CLIENT, TCP_SERVER};

/* Generate printable socket address depending on the address family
*
Expand Down
115 changes: 111 additions & 4 deletions lib/nodes/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
#include <villas/kernel/nl.hpp>
#endif // WITH_NETEM

#define MAX_CONNECTION_RETRIES 40
#define RETRIES_DELAY 2

using namespace villas;
using namespace villas::utils;
using namespace villas::node;
Expand Down Expand Up @@ -97,6 +100,14 @@ char *villas::node::socket_print(NodeCompat *n) {
case SocketLayer::UNIX:
layer = "unix";
break;

case SocketLayer::TCP_SERVER:
layer = "tcp-server";
break;

case SocketLayer::TCP_CLIENT:
layer = "tcp-client";
break;
}

char *local = socket_print_addr((struct sockaddr *)&s->in.saddr);
Expand Down Expand Up @@ -195,6 +206,11 @@ int villas::node::socket_start(NodeCompat *n) {
s->sd = socket(s->in.saddr.sa.sa_family, SOCK_DGRAM, 0);
break;

case SocketLayer::TCP_SERVER:
case SocketLayer::TCP_CLIENT:
s->sd = socket(s->in.saddr.sa.sa_family, SOCK_STREAM, 0);
break;

default:
throw RuntimeError("Invalid socket type!");
}
Expand Down Expand Up @@ -233,7 +249,11 @@ int villas::node::socket_start(NodeCompat *n) {
addrlen = sizeof(s->in.saddr);
}

ret = bind(s->sd, (struct sockaddr *)&s->in.saddr, addrlen);
if (s->layer != SocketLayer::TCP_CLIENT)
ret = bind(s->sd, (struct sockaddr *)&s->in.saddr, addrlen);
else
ret = 0;

if (ret < 0)
throw SystemError("Failed to bind socket");

Expand All @@ -258,6 +278,8 @@ int villas::node::socket_start(NodeCompat *n) {
int prio;
switch (s->layer) {
case SocketLayer::UDP:
case SocketLayer::TCP_SERVER:
case SocketLayer::TCP_CLIENT:
case SocketLayer::IP:
prio = IPTOS_LOWDELAY;
if (setsockopt(s->sd, IPPROTO_IP, IP_TOS, &prio, sizeof(prio)))
Expand Down Expand Up @@ -316,7 +338,18 @@ int villas::node::socket_stop(NodeCompat *n) {
}

if (s->sd >= 0) {
// Close client socket descriptor.
if (s->layer == SocketLayer::TCP_SERVER) {
ret = close(s->clt_sd);
if (ret)
throw SystemError("Failed to close TCP client socket descriptor");
}

ret = close(s->sd);

// Reset socket descriptor.
s->sd = -1;

if (ret)
return ret;
}
Expand All @@ -327,6 +360,47 @@ int villas::node::socket_stop(NodeCompat *n) {
return 0;
}

static void socket_tcp_connection(NodeCompat *n, Socket *s) {
int ret;
if (s->layer == SocketLayer::TCP_CLIENT) {
if (s->sd >= 0) {
ret = close(s->sd);
if (ret < 0)
throw SystemError("Failed to close socket descriptor");
}
s->sd = socket(s->in.saddr.sa.sa_family, SOCK_STREAM, 0);
if (s->sd < 0)
throw SystemError("Failed to create socket");
// Attempt to connect to TCP server.
int retries = 0;
while (retries < MAX_CONNECTION_RETRIES) {
n->logger->info("Attempting to connect to TCP server: attempt={}...", retries + 1);
ret = connect(s->sd, (struct sockaddr *)&s->out.saddr, sizeof(s->in.saddr));
if (ret == 0) {
s->tcp_connected = true;
break;
} else {
retries++;
if (retries < MAX_CONNECTION_RETRIES) {
sleep(RETRIES_DELAY);
}
}
}
if (ret < 0)
throw SystemError("Failed to conenct to TCP server");
} else if (s->layer == SocketLayer::TCP_SERVER) {
ret = listen(s->sd, 5);
if (ret < 0)
throw SystemError("Failed to listen for TCP client connection");
// Accept client connection and get client socket descriptor.
s->clt_sd = accept(s->sd, nullptr, nullptr);
if (s->clt_sd < 0) {
throw SystemError("Failed to accept TCP client connection");
}
s->tcp_connected = true;
}
}

int villas::node::socket_read(NodeCompat *n, struct Sample *const smps[],
unsigned cnt) {
int ret;
Expand All @@ -340,14 +414,34 @@ int villas::node::socket_read(NodeCompat *n, struct Sample *const smps[],
socklen_t srclen = sizeof(src);

// Receive next sample
bytes = recvfrom(s->sd, s->in.buf, s->in.buflen, 0, &src.sa, &srclen);

if (s->layer == SocketLayer::TCP_CLIENT) {
// Receive data from server.
if (!s->tcp_connected)
socket_tcp_connection(n, s);

bytes = recv(s->sd, s->in.buf, s->in.buflen, 0);
} else if (s->layer == SocketLayer::TCP_SERVER) {
// Receive data from client.
if (!s->tcp_connected)
socket_tcp_connection(n, s);

bytes = recv(s->clt_sd, s->in.buf, s->in.buflen, 0);
} else {
bytes = recvfrom(s->sd, s->in.buf, s->in.buflen, 0, &src.sa, &srclen);
}

if (bytes < 0) {
if (errno == EINTR)
return -1;

throw SystemError("Failed recvfrom()");
} else if (bytes == 0)
} else if (bytes == 0) {
if (s->layer == SocketLayer::TCP_CLIENT || s->layer == SocketLayer::TCP_SERVER)
s->tcp_connected = false;

return 0;
}

ptr = s->in.buf;

Expand Down Expand Up @@ -445,8 +539,17 @@ int villas::node::socket_write(NodeCompat *n, struct Sample *const smps[],
}

retry2:
bytes = sendto(s->sd, s->out.buf, wbytes, 0, (struct sockaddr *)&s->out.saddr,
if (s->layer == SocketLayer::TCP_CLIENT) {
// Send data to TCP server.
bytes = send(s->sd, s->out.buf, wbytes, 0);
} else if (s->layer == SocketLayer::TCP_SERVER) {
// Send data to TCP client.
bytes = send(s->clt_sd, s->out.buf, wbytes, 0);
} else {
bytes = sendto(s->sd, s->out.buf, wbytes, 0, (struct sockaddr *)&s->out.saddr,
addrlen);
}

if (bytes < 0) {
if ((errno == EPERM) || (errno == ENOENT && s->layer == SocketLayer::UNIX))
n->logger->warn("Failed sendto(): {}", strerror(errno));
Expand Down Expand Up @@ -505,6 +608,10 @@ int villas::node::socket_parse(NodeCompat *n, json_t *json) {
s->layer = SocketLayer::UDP;
else if (!strcmp(layer, "unix") || !strcmp(layer, "local"))
s->layer = SocketLayer::UNIX;
else if (!strcmp(layer, "tcp-client"))
s->layer = SocketLayer::TCP_CLIENT;
else if (!strcmp(layer, "tcp-server"))
s->layer = SocketLayer::TCP_SERVER;
else
throw SystemError("Invalid layer '{}'", layer);
}
Expand Down
6 changes: 6 additions & 0 deletions lib/socket_addr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ int villas::node::socket_parse_address(const char *addr, struct sockaddr *saddr,
hint.ai_protocol = IPPROTO_UDP;
break;

case SocketLayer::TCP_CLIENT:
case SocketLayer::TCP_SERVER:
hint.ai_socktype = SOCK_STREAM;
hint.ai_protocol = IPPROTO_TCP;
break;

default:
throw RuntimeError("Invalid address type");
}
Expand Down
80 changes: 80 additions & 0 deletions tests/integration/node-loopback-socket.sh
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,83 @@ wait %%
# Send / Receive data to node
VILLAS_LOG_PREFIX="[compare] " \
villas compare input.dat output.dat

cat > config.json <<EOF
{
"nodes": {
"node1": {
"type": "socket",
"layer": "tcp-server",
"in": {
"address": "127.0.0.1:12002",
"signals": {
"type": "float",
"count": 1
}
},
"out": {
"address": "127.0.0.1:12003"
}
},
"node2": {
"type": "socket",
"layer": "tcp-client",
"in": {
"address": "127.0.0.1:12003",
"signals": {
"type": "float",
"count": 1
}
},
"out": {
"address": "127.0.0.1:12002"
}
},
"siggen": {
"type": "signal",
"signal": "random",
"limit": 10
}
},
"paths": [
{
"in": "siggen",
"out": "node2",
"hooks": [
{
"type": "print",

"output": "input.dat"
}
]
},
{
"in": "node1",
"out": "node1"
},
{
"in": "node2",
"hooks": [
{
"type": "print",

"output": "output.dat"
}
]
}
]
}
EOF

VILLAS_LOG_PREFIX="[node] " \
villas node config.json &

# Wait for node to complete init
sleep 3

kill %%
wait %%

# Compare data
VILLAS_LOG_PREFIX="[compare] " \
villas compare input.dat output.dat