/** * HTTP-based order service implementation with retry logic and * connection pooling * * FLOW OVERVIEW: * 1. orderItem() validates input and enqueues order request * 2. Background worker thread processes queue sequentially (FIFO) * 3. Failed requests are retried with exponential backoff (1s, 2s, 4s...) * 4. HTTP clients are cached per host and auto-cleaned when unused * 5. Service shuts down gracefully, completing queued orders * * IMPORTANT LIMITATIONS: * - Uses single worker thread - retries of failed requests will block * processing of new orders until retry delay expires * - Not suitable for time-critical operations due to sequential processing * - Designed for fire-and-forget order notifications, not real-time * transactions */ #include "HttpOrderService.h" #include "autostore/Version.h" #include #include #include #include #include #include #include #include #include #include #include namespace nxl::autostore::infrastructure { namespace { constexpr int MAX_RETRIES = 3; constexpr int CONNECTION_TIMEOUT_SECONDS = 5; constexpr int READ_TIMEOUT_SECONDS = 5; constexpr int WRITE_TIMEOUT_SECONDS = 5; constexpr char CONTENT_TYPE_JSON[] = "application/json"; std::pair parseUrl(const std::string& url) { static const std::regex url_regex( R"(^(https?:\/\/)?([^\/:]+)(?::(\d+))?(\/[^\?]*)?(\?.*)?$)"); std::smatch matches; if (!std::regex_match(url, matches, url_regex) || matches.size() < 5) { throw std::runtime_error("Invalid URL format: " + url); } std::string host = matches[2].str(); std::string port = matches[3].str(); std::string path = matches[4].str(); std::string query = matches[5].str(); if (!port.empty()) { host += ":" + port; } if (path.empty()) { path = "/"; } path += query; return {host, path}; } std::string createOrderPayload(const domain::Item& item) { // Escape JSON special characters in strings auto escapeJson = [](const std::string& str) { std::string escaped; escaped.reserve(str.size() + 10); // Reserve extra space for escapes for (char c : str) { switch (c) { case '"': escaped += "\\\""; break; case '\\': escaped += "\\\\"; break; case '\b': escaped += "\\b"; break; case '\f': escaped += "\\f"; break; case '\n': escaped += "\\n"; break; case '\r': escaped += "\\r"; break; case '\t': escaped += "\\t"; break; default: escaped += c; break; } } return escaped; }; return R"({"itemName": ")" + escapeJson(item.name) + R"(", "itemId": ")" + escapeJson(item.id) + "\"}"; } } // namespace struct OrderRequest { std::string url; std::string payload; int retryCount = 0; std::chrono::system_clock::time_point nextAttemptTime; OrderRequest() = default; OrderRequest(std::string url, std::string payload, int rc = 0, std::chrono::system_clock::time_point nat = std::chrono::system_clock::now()) : url{std::move(url)}, payload{std::move(payload)}, retryCount{rc}, nextAttemptTime(nat) {} }; class HttpOrderService::Impl { public: explicit Impl(ILoggerPtr logger) : log{std::move(logger)}, shutdownRequested{false} { if (!log) { throw std::invalid_argument("Logger cannot be null"); } userAgent = "Autostore/" + nxl::getVersionString(); workerThread = std::thread(&Impl::processQueue, this); } ~Impl() { shutdown(); if (workerThread.joinable()) { workerThread.join(); } } void enqueueOrder(const std::string& url, std::string payload) { { std::lock_guard lock(queueMutex); if (shutdownRequested) { throw std::runtime_error( "Service is shutting down, cannot enqueue new orders"); } orderQueue.emplace(url, std::move(payload)); } queueCondition.notify_one(); } private: void shutdown() { { std::lock_guard lock(queueMutex); shutdownRequested = true; } queueCondition.notify_one(); } bool shouldShutdown() const { return shutdownRequested && orderQueue.empty(); } bool isRequestReady(const OrderRequest& request) const { return request.nextAttemptTime <= std::chrono::system_clock::now(); } void processQueue() { while (true) { std::unique_lock lock(queueMutex); // Wait for orders or shutdown signal queueCondition.wait( lock, [this] { return !orderQueue.empty() || shutdownRequested; }); if (shouldShutdown()) { break; } if (orderQueue.empty()) { continue; } // Check if the front request is ready to be processed if (!isRequestReady(orderQueue.front())) { // Wait until the next attempt time auto waitTime = orderQueue.front().nextAttemptTime - std::chrono::system_clock::now(); if (waitTime > std::chrono::milliseconds(0)) { queueCondition.wait_for(lock, waitTime); } continue; } // Extract request for processing OrderRequest request = std::move(orderQueue.front()); orderQueue.pop(); // Release lock before processing to avoid blocking other operations lock.unlock(); processRequest(request); } } void processRequest(OrderRequest& request) { try { sendPostRequest(request.url, request.payload); log->i("Order request sent successfully to: %s", request.url.c_str()); } catch (const std::exception& e) { log->e("Failed to send order request to %s: %s", request.url.c_str(), e.what()); handleFailedRequest(request); } } void handleFailedRequest(OrderRequest& request) { if (request.retryCount < MAX_RETRIES) { request.retryCount++; // Exponential backoff: 1s, 2s, 4s, 8s... auto delay = std::chrono::seconds(1 << (request.retryCount - 1)); request.nextAttemptTime = std::chrono::system_clock::now() + delay; log->w("Retrying order request to %s (attempt %d/%d) in %ld seconds", request.url.c_str(), request.retryCount, MAX_RETRIES, delay.count()); { std::lock_guard lock(queueMutex); if (!shutdownRequested) { orderQueue.push(std::move(request)); } } queueCondition.notify_one(); } else { log->e("Max retries exceeded for order request to: %s", request.url.c_str()); } } std::shared_ptr getOrCreateClient(const std::string& host) { std::lock_guard lock(clientsMutex); auto it = clients.find(host); if (it != clients.end()) { // Check if client is still valid auto client = it->second.lock(); if (client) { return client; } else { // Remove expired weak_ptr clients.erase(it); } } // Create new client auto client = std::make_shared(host); configureClient(*client); clients[host] = client; return client; } void configureClient(httplib::Client& client) { client.set_connection_timeout(CONNECTION_TIMEOUT_SECONDS, 0); client.set_read_timeout(READ_TIMEOUT_SECONDS, 0); client.set_write_timeout(WRITE_TIMEOUT_SECONDS, 0); // Enable keep-alive for better performance client.set_keep_alive(true); // Set reasonable limits client.set_compress(true); } void sendPostRequest(const std::string& url, const std::string& payload) { auto [host, path] = parseUrl(url); auto client = getOrCreateClient(host); httplib::Headers headers = {{"Content-Type", CONTENT_TYPE_JSON}, {"User-Agent", userAgent}, {"Accept", CONTENT_TYPE_JSON}}; log->i("Sending POST request to: %s%s", host.c_str(), path.c_str()); log->v(1, "Payload: %s", payload.c_str()); auto res = client->Post(path, headers, payload, CONTENT_TYPE_JSON); if (!res) { throw std::runtime_error("Failed to connect to: " + host); } log->v(2, "Response status: %d", res->status); log->v(3, "Response body: %s", res->body.c_str()); if (res->status < 200 || res->status >= 300) { std::string error_msg = "HTTP request failed with status: " + std::to_string(res->status) + " for URL: " + url; if (!res->body.empty()) { error_msg += " Response: " + res->body; } throw std::runtime_error(error_msg); } } ILoggerPtr log; std::queue orderQueue; std::mutex queueMutex; std::condition_variable queueCondition; std::thread workerThread; std::atomic shutdownRequested; // Use weak_ptr to allow automatic cleanup of unused clients std::unordered_map> clients; std::mutex clientsMutex; std::string userAgent; }; HttpOrderService::HttpOrderService(ILoggerPtr logger) : impl{std::make_unique(std::move(logger))} {} HttpOrderService::~HttpOrderService() = default; void HttpOrderService::orderItem(const domain::Item& item) { if (item.orderUrl.empty()) { throw std::runtime_error("Order URL is empty for item: " + item.name); } if (item.orderUrl.find("://") == std::string::npos) { throw std::runtime_error("Invalid URL format for item: " + item.name + " (missing protocol)"); } std::string payload = createOrderPayload(item); impl->enqueueOrder(item.orderUrl, std::move(payload)); } } // namespace nxl::autostore::infrastructure