Artic Base: Add Artic Controller support (#195)

This commit is contained in:
PabloMK7 2024-07-16 22:00:21 +02:00 committed by GitHub
parent 9de19ff7a1
commit 55748d7d1a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
24 changed files with 741 additions and 158 deletions

View file

@ -121,6 +121,81 @@ Client::Request::Request(u32 request_id, const std::string& method, size_t max_p
std::min<size_t>(request_packet.method.size(), method.size()));
}
void Client::UDPStream::Start() {
thread_run = true;
handle_thread = std::thread(&Client::UDPStream::Handle, this);
}
void Client::UDPStream::Handle() {
struct sockaddr_in* servaddr = reinterpret_cast<sockaddr_in*>(serv_sockaddr_in.data());
socklen_t serv_sockaddr_len = static_cast<socklen_t>(serv_sockaddr_in.size());
memcpy(servaddr, client.GetServerAddr().data(), client.GetServerAddr().size());
servaddr->sin_port = htons(port);
main_socket = ::socket(AF_INET, SOCK_DGRAM, 0);
if (main_socket == static_cast<SocketHolder>(-1) || !thread_run) {
LOG_ERROR(Network, "Failed to create socket");
return;
}
if (!SetNonBlock(main_socket, true) || !thread_run) {
closesocket(main_socket);
LOG_ERROR(Network, "Cannot set non-blocking socket mode");
return;
}
// Limit receive buffer so that packets don't get qeued and are dropped instead.
int buffer_size_int = static_cast<int>(buffer_size);
if (::setsockopt(main_socket, SOL_SOCKET, SO_RCVBUF, reinterpret_cast<char*>(&buffer_size_int),
sizeof(buffer_size_int)) ||
!thread_run) {
closesocket(main_socket);
LOG_ERROR(Network, "Cannot change receive buffer size");
return;
}
// Send data to server so that it knows client address.
char zero = '\0';
int send_res =
::sendto(main_socket, &zero, sizeof(char), 0,
reinterpret_cast<struct sockaddr*>(serv_sockaddr_in.data()), serv_sockaddr_len);
if (send_res < 0 || !thread_run) {
closesocket(main_socket);
LOG_ERROR(Network, "Cannot send data to socket");
return;
}
ready = true;
std::vector<u8> buffer(buffer_size);
while (thread_run) {
std::chrono::steady_clock::time_point before = std::chrono::steady_clock::now();
int packet_size = ::recvfrom(
main_socket, reinterpret_cast<char*>(buffer.data()), static_cast<int>(buffer.size()), 0,
reinterpret_cast<struct sockaddr*>(serv_sockaddr_in.data()), &serv_sockaddr_len);
if (packet_size > 0) {
if (client.report_traffic_callback) {
client.report_traffic_callback(packet_size);
}
buffer.resize(packet_size);
{
std::scoped_lock l(current_buffer_mutex);
current_buffer = buffer;
}
}
auto elapsed = std::chrono::steady_clock::now() - before;
std::unique_lock lk(thread_cv_mutex);
thread_cv.wait_for(lk, elapsed < read_interval ? (read_interval - elapsed)
: std::chrono::microseconds(50));
}
ready = false;
closesocket(main_socket);
}
Client::~Client() {
StopImpl(false);
@ -182,6 +257,7 @@ bool Client::Connect() {
servaddr.sin_addr.s_addr = ((struct sockaddr_in*)(addrinfo->ai_addr))->sin_addr.s_addr;
servaddr.sin_port = htons(port);
freeaddrinfo(addrinfo);
memcpy(last_sockaddr_in.data(), &servaddr, last_sockaddr_in.size());
if (!ConnectWithTimeout(main_socket, &servaddr, sizeof(servaddr), 10)) {
closesocket(main_socket);
@ -249,15 +325,15 @@ bool Client::Connect() {
std::string str_port;
std::stringstream ss_port(worker_ports.value());
while (std::getline(ss_port, str_port, ',')) {
int port = str_to_int(str_port);
if (port < 0 || port > static_cast<int>(USHRT_MAX)) {
int port_curr = str_to_int(str_port);
if (port_curr < 0 || port_curr > static_cast<int>(USHRT_MAX)) {
shutdown(main_socket, SHUT_RDWR);
closesocket(main_socket);
LOG_ERROR(Network, "Couldn't parse server worker ports");
SignalCommunicationError();
return false;
}
ports.push_back(static_cast<u16>(port));
ports.push_back(static_cast<u16>(port_curr));
}
if (ports.empty()) {
shutdown(main_socket, SHUT_RDWR);
@ -294,6 +370,29 @@ bool Client::Connect() {
return true;
}
std::shared_ptr<Client::UDPStream> Client::NewUDPStream(
const std::string stream_id, size_t buffer_size,
const std::chrono::milliseconds& read_interval) {
auto req = NewRequest("#" + stream_id);
auto resp = Send(req);
if (!resp.has_value()) {
return nullptr;
}
auto port_udp = resp->GetResponseS32(0);
if (!port_udp.has_value()) {
return nullptr;
}
udp_streams.push_back(std::make_shared<UDPStream>(*this, static_cast<u16>(*port_udp),
buffer_size, read_interval));
return udp_streams.back();
}
void Client::StopImpl(bool from_error) {
bool expected = false;
if (!stopped.compare_exchange_strong(expected, true))
@ -303,6 +402,10 @@ void Client::StopImpl(bool from_error) {
SendSimpleRequest("STOP");
}
for (auto it = udp_streams.begin(); it != udp_streams.end(); it++) {
it->get()->Stop();
}
if (ping_thread.joinable()) {
std::scoped_lock l2(ping_cv_mutex);
ping_run = false;

View file

@ -60,6 +60,61 @@ public:
std::vector<std::pair<const void*, size_t>> pending_big_buffers;
};
class UDPStream {
public:
std::vector<u8> GetLastPacket() {
std::scoped_lock l(current_buffer_mutex);
return current_buffer;
}
bool IsReady() {
return ready;
}
void Start();
void Stop() {
if (thread_run && handle_thread.joinable()) {
std::scoped_lock l2(thread_cv_mutex);
thread_run = false;
thread_cv.notify_one();
}
}
UDPStream(Client& _client, u16 _port, size_t _buffer_size,
const std::chrono::milliseconds& _read_interval)
: client(_client), port(_port), buffer_size(_buffer_size),
read_interval(_read_interval) {}
~UDPStream() {
Stop();
if (handle_thread.joinable()) {
handle_thread.join();
}
}
private:
void Handle();
Client& client;
u16 port;
size_t buffer_size;
std::chrono::milliseconds read_interval;
std::array<u8, 16> serv_sockaddr_in{};
bool ready = false;
std::mutex current_buffer_mutex;
std::vector<u8> current_buffer;
SocketHolder main_socket = -1;
std::thread handle_thread;
std::condition_variable thread_cv;
std::mutex thread_cv_mutex;
std::atomic<bool> thread_run = true;
};
friend class UDPStream;
Client(const std::string& _address, u16 _port) : address(_address), port(_port) {
SocketManager::EnableSockets();
}
@ -76,6 +131,10 @@ public:
return Request(GetNextRequestID(), method, max_parameter_count);
}
std::shared_ptr<UDPStream> NewUDPStream(
const std::string stream_id, size_t buffer_size,
const std::chrono::milliseconds& read_interval = std::chrono::milliseconds(0));
void Stop() {
StopImpl(false);
}
@ -97,11 +156,17 @@ public:
report_artic_event_callback = callback;
}
// Returns the server address as a sockaddr_in struct
const std::array<u8, 16>& GetServerAddr() {
return last_sockaddr_in;
}
private:
static constexpr const int SERVER_VERSION = 1;
static constexpr const int SERVER_VERSION = 2;
std::string address;
u16 port;
std::array<u8, 16> last_sockaddr_in;
SocketHolder main_socket = -1;
std::atomic<u32> currRequestID;
@ -124,7 +189,7 @@ private:
std::thread ping_thread;
std::condition_variable ping_cv;
std::mutex ping_cv_mutex;
bool ping_run = true;
std::atomic<bool> ping_run = true;
void StopImpl(bool from_error);
@ -145,6 +210,8 @@ private:
const std::chrono::nanoseconds& read_timeout = std::chrono::nanoseconds(0));
std::optional<std::string> SendSimpleRequest(const std::string& method);
std::vector<std::shared_ptr<UDPStream>> udp_streams;
class Handler {
public:
Handler(Client& _client, u32 _addr, u16 _port, int _id);
@ -242,6 +309,14 @@ public:
return *reinterpret_cast<u64*>(buf->first);
}
std::optional<float> GetResponseFloat(u32 buffer_id) const {
auto buf = GetResponseBuffer(buffer_id);
if (!buf.has_value() || buf->second != sizeof(float)) {
return std::nullopt;
}
return *reinterpret_cast<float*>(buf->first);
}
private:
friend class Client;
friend class Client::Handler;