diff --git a/tieto-cpu-tracker/lib/thread/CMakeLists.txt b/tieto-cpu-tracker/lib/thread/CMakeLists.txt index b5caf61..d32eaae 100644 --- a/tieto-cpu-tracker/lib/thread/CMakeLists.txt +++ b/tieto-cpu-tracker/lib/thread/CMakeLists.txt @@ -1 +1,29 @@ cmake_minimum_required(VERSION 3.5) + +project(thread LANGUAGES C VERSION 0.1.0) + +set(C_STANDARD 11) +set(TARGET ${PROJECT_NAME}) +set(CMAKE_INCLUDE_CURRENT_DIR ON) +enable_testing() + +add_library(${TARGET} + src/core/TaskQueue.c + src/infrastructure/Thread.c + src/infrastructure/QueuedThread.c + src/infrastructure/Watchdog.c + src/infrastructure/PtBlocker.c + ) + +target_include_directories(${TARGET} + PRIVATE + src + PUBLIC + include + ) + +if (${BUILD_TESTS}) + add_subdirectory(tests/unit) + add_subdirectory(tests/integration) +endif() + diff --git a/tieto-cpu-tracker/lib/thread/include/thread/Blocker.h b/tieto-cpu-tracker/lib/thread/include/thread/Blocker.h new file mode 100644 index 0000000..3ccc0b5 --- /dev/null +++ b/tieto-cpu-tracker/lib/thread/include/thread/Blocker.h @@ -0,0 +1,22 @@ +#ifndef BLOCKER_H +#define BLOCKER_H + +#include + +typedef int8_t BlockerHandle; +typedef void(*LockBlockerFn)(BlockerHandle); +typedef void(*NotifyBlockerFn)(BlockerHandle); + +enum PTB_TIMEOUT_STATUS {PTB_TIMEOUT, PTB_NO_TIMEOUT}; + +BlockerHandle createBlocker(void); + +void lockBlocker(BlockerHandle id); + +enum PTB_TIMEOUT_STATUS lockBlockerTimed(BlockerHandle id, uint32_t timeoutMs); + +void notifyBlocker(BlockerHandle id); + +void freeBlocker(BlockerHandle* id); + +#endif // BLOCKER_H diff --git a/tieto-cpu-tracker/lib/thread/include/thread/QueuedThread.h b/tieto-cpu-tracker/lib/thread/include/thread/QueuedThread.h new file mode 100644 index 0000000..fbbe974 --- /dev/null +++ b/tieto-cpu-tracker/lib/thread/include/thread/QueuedThread.h @@ -0,0 +1,26 @@ +#ifndef QUEUEDTHREAD_H +#define QUEUEDTHREAD_H + +#include "thread/Blocker.h" +#include "thread/TaskQueue.h" +#include "thread/Thread.h" + +typedef struct QueuedThread +{ + TaskQueue* taskQueue; + BlockerHandle blocker; + uint8_t reserved[sizeof(void*)-sizeof(BlockerHandle)]; + Thread* thread; +} QueuedThread; + + +QueuedThread* createQueuedThread(void); + +void postQueuedTask(QueuedThread* qthread, Task task, void* arg); + +void joinQueuedThread(QueuedThread* qthread); + +void freeQueuedThread(QueuedThread** qthread); + + +#endif // QUEUEDTHREAD_H diff --git a/tieto-cpu-tracker/lib/thread/include/thread/TaskQueue.h b/tieto-cpu-tracker/lib/thread/include/thread/TaskQueue.h new file mode 100644 index 0000000..9c30338 --- /dev/null +++ b/tieto-cpu-tracker/lib/thread/include/thread/TaskQueue.h @@ -0,0 +1,42 @@ +#ifndef TASKQUEUE_H +#define TASKQUEUE_H + +#include "Blocker.h" +#include +#include + +typedef void(*Task)(void*); + +typedef struct TaskEntry { + Task task; + void* data; + struct TaskEntry* next; +} TaskEntry; + +typedef struct TaskQueue { + TaskEntry* front; + TaskEntry* back; + NotifyBlockerFn notify; + LockBlockerFn lock; + BlockerHandle blocker; + bool running; + uint8_t reserved[6]; +} TaskQueue; + +typedef struct TaskQueue* TaskQueuePtr; + +TaskQueuePtr createTaskQueue(BlockerHandle blocker, LockBlockerFn lock, NotifyBlockerFn notify); + +void enqueueTask(TaskQueuePtr queue, Task task, void* data); + +TaskEntry* dequeueTask(TaskQueuePtr queue); + +void runQueue(TaskQueuePtr queue); + +void stopQueue(TaskQueuePtr queue); + +void freeTask(TaskEntry** entry); + +void freeTaskQueue(TaskQueuePtr* queue); + +#endif // TASKQUEUE_H diff --git a/tieto-cpu-tracker/lib/thread/include/thread/Thread.h b/tieto-cpu-tracker/lib/thread/include/thread/Thread.h new file mode 100644 index 0000000..f6d3359 --- /dev/null +++ b/tieto-cpu-tracker/lib/thread/include/thread/Thread.h @@ -0,0 +1,21 @@ +#ifndef THREAD_H +#define THREAD_H + +typedef void*(*ThreadFn)(void*); + +typedef struct Thread +{ + void* impl; +} Thread; + + +Thread* createThread(ThreadFn threadFn, void* arg); + +void joinThread(Thread* thread); + +void killThread(Thread* thread); + +void freeThread(Thread** thread); + + +#endif // THREAD_H diff --git a/tieto-cpu-tracker/lib/thread/include/thread/Watchdog.h b/tieto-cpu-tracker/lib/thread/include/thread/Watchdog.h new file mode 100644 index 0000000..c79b903 --- /dev/null +++ b/tieto-cpu-tracker/lib/thread/include/thread/Watchdog.h @@ -0,0 +1,39 @@ +#ifndef WATCHDOG_H +#define WATCHDOG_H + +#include "thread/QueuedThread.h" +#include + +typedef void(*WatchdogEventHandler)(Thread*); + +typedef void(*LogFn)(const char* tag, const char* format, ...); + +typedef struct Watchdog +{ + Thread* watchingThread; + void* threads; + WatchdogEventHandler eventHandler; + LogFn log; + uint32_t intervalMs; + bool running; + BlockerHandle blocker; + uint8_t reserved[2]; +} Watchdog; + +Watchdog* createWatchdog(uint32_t intervalMs, WatchdogEventHandler eventHandler, LogFn logFn); + +QueuedThread* createWatchedThread(Watchdog* watchdog); + +void startWatchdog(Watchdog* watchdog); + +void startWatchdogSync(Watchdog* watchdog); + +void stopWatchdog(Watchdog* watchdog); + +void joinWatchedThreads(Watchdog* watchdog); + +void freeWatchedThreads(Watchdog* watchdog); + +void freeWatchdog(Watchdog** watchdog); + +#endif // WATCHDOG_H diff --git a/tieto-cpu-tracker/lib/thread/src/core/TaskQueue.c b/tieto-cpu-tracker/lib/thread/src/core/TaskQueue.c new file mode 100644 index 0000000..4305cd3 --- /dev/null +++ b/tieto-cpu-tracker/lib/thread/src/core/TaskQueue.c @@ -0,0 +1,95 @@ +#include "thread/TaskQueue.h" + +#include + +inline static void runUntilEmpty(TaskQueuePtr queue) { + TaskEntry* current = NULL; + for (current = dequeueTask(queue); + current != NULL; + current = dequeueTask(queue)) { + if (queue->running) { // any task could've changed it + current->task(current->data); + } + freeTask(¤t); + } +} + +TaskQueuePtr createTaskQueue(BlockerHandle blocker, LockBlockerFn lock, NotifyBlockerFn notify) +{ + TaskQueuePtr queue = calloc(1, sizeof(TaskQueue)); + queue->front = NULL; + queue->back = NULL; + queue->blocker = blocker; + queue->lock = lock; + queue->notify = notify; + queue->running = false; + return queue; +} + +void enqueueTask(TaskQueuePtr queue, Task task, void* data) +{ + TaskEntry* newEntry = (TaskEntry*)calloc(1, sizeof(TaskEntry)); + + newEntry->task = task; + newEntry->data = data; + newEntry->next = NULL; + + if (queue->front == NULL) { + queue->front = newEntry; + queue->back = newEntry; + } else { + queue->back->next = newEntry; + queue->back = newEntry; + } + queue->notify(queue->blocker); +} + +TaskEntry* dequeueTask(TaskQueuePtr queue) +{ + TaskEntry* current = NULL; + if (queue->front == NULL) { + return current; + } + current = queue->front; + queue->front = current->next; + if (queue->front == NULL) { + queue->back = NULL; + } + return current; +} + +void runQueue(TaskQueuePtr queue) +{ + queue->running = true; + while (queue->running) { + runUntilEmpty(queue); + if (queue->running) { + queue->lock(queue->blocker); + } + } + // execute remaining tasks + runUntilEmpty(queue); +} + +void stopQueue(TaskQueuePtr queue) +{ + queue->running = false; + queue->notify(queue->blocker); +} + +void freeTask(TaskEntry** entry) +{ + free(*entry); + *entry = NULL; +} + +void freeTaskQueue(TaskQueuePtr* queue) +{ + TaskEntry* task = dequeueTask(*queue); + while (task != NULL) { + freeTask(&task); + task = dequeueTask(*queue); + } + free(*queue); + *queue = NULL; +} diff --git a/tieto-cpu-tracker/lib/thread/src/infrastructure/PtBlocker.c b/tieto-cpu-tracker/lib/thread/src/infrastructure/PtBlocker.c new file mode 100644 index 0000000..728ac9d --- /dev/null +++ b/tieto-cpu-tracker/lib/thread/src/infrastructure/PtBlocker.c @@ -0,0 +1,76 @@ +#include "thread/Blocker.h" + +#include +#include +#include +#include + +#ifndef BLOCKER_COUNT_MAX +#define BLOCKER_COUNT_MAX 8 +#endif + +typedef struct PtBlocker +{ + pthread_mutex_t mutex; + pthread_cond_t cond; +} PtBlocker; + +// TODO: dynamic list? +static PtBlocker* blockers[BLOCKER_COUNT_MAX] = {NULL}; + +BlockerHandle createBlocker(void) +{ + BlockerHandle id = 0; + while (blockers[id] != NULL && id < BLOCKER_COUNT_MAX) { + ++id; + } + if (id == BLOCKER_COUNT_MAX) { + return -1; + } + blockers[id] = calloc(1, sizeof(PtBlocker)); + return id; +} + +void lockBlocker(BlockerHandle id) +{ + pthread_mutex_lock(&blockers[id]->mutex); + pthread_cond_wait(&blockers[id]->cond, &blockers[id]->mutex); + pthread_mutex_unlock(&blockers[id]->mutex); +} + +enum PTB_TIMEOUT_STATUS lockBlockerTimed(BlockerHandle id, uint32_t timeoutMs) +{ + enum PTB_TIMEOUT_STATUS status = PTB_TIMEOUT; + int waitResult = 0; + struct timespec timeout; + clock_gettime(CLOCK_REALTIME, &timeout); + timeout.tv_sec += timeoutMs / 1000; + timeout.tv_nsec += (timeoutMs % 1000) * 1000000; + + pthread_mutex_lock(&blockers[id]->mutex); + waitResult = pthread_cond_timedwait(&blockers[id]->cond, &blockers[id]->mutex, &timeout); + if (waitResult == 0) { + status = PTB_NO_TIMEOUT; + } + pthread_mutex_unlock(&blockers[id]->mutex); + return status; +} + +void notifyBlocker(BlockerHandle id) +{ + pthread_mutex_lock(&blockers[id]->mutex); + pthread_cond_signal(&blockers[id]->cond); + pthread_mutex_unlock(&blockers[id]->mutex); +} + +void freeBlocker(BlockerHandle* id) +{ + if (*id == -1) { + return; + } + pthread_mutex_destroy(&blockers[*id]->mutex); + pthread_cond_destroy(&blockers[*id]->cond); + free(blockers[*id]); + blockers[*id] = NULL; + *id = -1; +} diff --git a/tieto-cpu-tracker/lib/thread/src/infrastructure/QueuedThread.c b/tieto-cpu-tracker/lib/thread/src/infrastructure/QueuedThread.c new file mode 100644 index 0000000..51a0115 --- /dev/null +++ b/tieto-cpu-tracker/lib/thread/src/infrastructure/QueuedThread.c @@ -0,0 +1,44 @@ +#include "thread/QueuedThread.h" +#include "thread/Blocker.h" + +#include +#include + +static void* queueThread(void* arg) +{ + QueuedThread* qthread = (QueuedThread*)arg; + runQueue(qthread->taskQueue); + return NULL; +} + +QueuedThread* createQueuedThread(void) +{ + QueuedThread* qthread = calloc(1, sizeof(QueuedThread)); + qthread->blocker = createBlocker(); + qthread->taskQueue = createTaskQueue(qthread->blocker, &lockBlocker, ¬ifyBlocker); + qthread->thread = createThread(&queueThread, qthread); + return qthread; +} + +void postQueuedTask(QueuedThread* qthread, Task task, void* arg) +{ + enqueueTask(qthread->taskQueue, task, arg); +} + +void joinQueuedThread(QueuedThread* qthread) +{ + stopQueue(qthread->taskQueue); + joinThread(qthread->thread); +} + +void freeQueuedThread(QueuedThread** qthread) +{ + QueuedThread* t = *qthread; + freeTaskQueue(&(t->taskQueue)); + freeBlocker(&(t->blocker)); + freeThread(&(t->thread)); + free(t); + + *qthread = NULL; +} + diff --git a/tieto-cpu-tracker/lib/thread/src/infrastructure/Thread.c b/tieto-cpu-tracker/lib/thread/src/infrastructure/Thread.c new file mode 100644 index 0000000..a68f44e --- /dev/null +++ b/tieto-cpu-tracker/lib/thread/src/infrastructure/Thread.c @@ -0,0 +1,43 @@ +#include "thread/Thread.h" + +#include +#include +#include +#include + +typedef struct ThreadPthreadImpl +{ + pthread_t thread; +} ThreadPthreadImpl; + + +Thread* createThread(ThreadFn threadFn, void* arg) +{ + ThreadPthreadImpl* pthreadImpl = NULL; + Thread* thread = calloc(1, sizeof(Thread)); + thread->impl = calloc(1, sizeof(ThreadPthreadImpl)); + pthreadImpl = (ThreadPthreadImpl*)thread->impl; + if (pthread_create(&(pthreadImpl->thread), NULL, threadFn, arg) != 0) { + printf("Thread: Failed to create thread.\n"); + } + return thread; +} + +void joinThread(Thread* thread) +{ + ThreadPthreadImpl* pthreadImpl = (ThreadPthreadImpl*)thread->impl; + pthread_join(pthreadImpl->thread, NULL); +} + +void killThread(Thread* thread) +{ + ThreadPthreadImpl* pthreadImpl = (ThreadPthreadImpl*)thread->impl; + pthread_kill(pthreadImpl->thread, SIGUSR1); // thread needs to handle or SIG_IGN this +} + +void freeThread(Thread** thread) +{ + free((ThreadPthreadImpl*)((*thread)->impl)); + free(*thread); + *thread = NULL; +} diff --git a/tieto-cpu-tracker/lib/thread/src/infrastructure/Watchdog.c b/tieto-cpu-tracker/lib/thread/src/infrastructure/Watchdog.c new file mode 100644 index 0000000..3b5fa3f --- /dev/null +++ b/tieto-cpu-tracker/lib/thread/src/infrastructure/Watchdog.c @@ -0,0 +1,158 @@ +#include "thread/Watchdog.h" +#include "thread/Blocker.h" + +#include +#include +#include +#include + +#include + +#define TAG "Watchdog" + +typedef struct ThreadNode +{ + QueuedThread* thread; + struct ThreadNode* next; + uint64_t lastCheck; +} ThreadNode; + +static uint64_t getCurrentTimestampMs() { + uint64_t nowMs; + struct timespec currentTime; + clock_gettime(CLOCK_REALTIME, ¤tTime); + nowMs = (uint64_t)currentTime.tv_sec * 1000UL + + (uint64_t)currentTime.tv_nsec / 1000000UL; + return nowMs; +} + +static void check(void* arg) +{ + ThreadNode* node = (ThreadNode*)arg; + node->lastCheck = getCurrentTimestampMs(); +} + +static void* watchThread(void* arg) +{ + Watchdog* watchdog = (Watchdog*)arg; + uint64_t sendMs = 0; + uint64_t responseMs = 0; + ThreadNode* node = NULL; + while (watchdog->running) { + // post check tasks + sendMs = getCurrentTimestampMs(); + node = watchdog->threads; + while (node != NULL) { + watchdog->log(TAG, "Posting check task for thread %p", node->thread->thread); + postQueuedTask(node->thread, &check, node); + node = node->next; + } + + lockBlockerTimed(watchdog->blocker, watchdog->intervalMs); + + node = watchdog->threads; + while (node != NULL) { + responseMs = node->lastCheck - sendMs; + watchdog->log(TAG, "Response time for thread %p = %dms", node->thread->thread, responseMs); + if (responseMs > watchdog->intervalMs) { + if (watchdog->eventHandler != NULL && watchdog->running) { + watchdog->log(TAG, "Thread %p is unresponsive, notifying handler", node->thread->thread); + watchdog->eventHandler(node->thread->thread); + } + } + node = node->next; + } + } + pthread_exit(NULL); + return NULL; +} + + + +Watchdog* createWatchdog(uint32_t intervalMs, WatchdogEventHandler eventHandler, LogFn logFn) +{ + Watchdog* watchdog = calloc(1, sizeof(Watchdog)); + watchdog->intervalMs = intervalMs; + watchdog->threads = NULL; + watchdog->watchingThread = NULL; + watchdog->blocker = createBlocker(); + watchdog->eventHandler = eventHandler; + watchdog->log = logFn; + return watchdog; +} + +void startWatchdog(Watchdog* watchdog) +{ + if (watchdog->running || watchdog->watchingThread != NULL) { + printf("Watchdog: Watching thread is already running\n"); + return; + } + watchdog->running = true; + watchdog->watchingThread = createThread(watchThread, watchdog); +} + +void startWatchdogSync(Watchdog* watchdog) +{ + if (watchdog->running) { + printf("Watchdog: Watching thread is already running\n"); + return; + } + watchdog->running = true; + watchThread(watchdog); +} + +void stopWatchdog(Watchdog* watchdog) +{ + watchdog->running = false; + notifyBlocker(watchdog->blocker); + if (watchdog->watchingThread == NULL) { + return; + } + joinThread(watchdog->watchingThread); + freeThread(&(watchdog->watchingThread)); + watchdog->watchingThread = NULL; +} + +QueuedThread* createWatchedThread(Watchdog* watchdog) +{ + QueuedThread* qthread = createQueuedThread(); + ThreadNode* node = calloc(1, sizeof(ThreadNode)); + node->thread = qthread; + node->next = (watchdog->threads == NULL) ? NULL : watchdog->threads; + node->lastCheck = getCurrentTimestampMs(); + watchdog->threads = node; + return qthread; +} + +void joinWatchedThreads(Watchdog* watchdog) +{ + ThreadNode* node = watchdog->threads; + while (node != NULL) { + joinQueuedThread(node->thread); + node = node->next; + } +} + +void freeWatchedThreads(Watchdog* watchdog) +{ + ThreadNode* node = watchdog->threads; + while (node != NULL) { + freeQueuedThread(&(node->thread)); + node = node->next; + } +} + +void freeWatchdog(Watchdog** watchdog) +{ + Watchdog* w = *watchdog; + ThreadNode* node = w->threads; + ThreadNode* tmpNode = NULL; + while (node != NULL) { + tmpNode = node; + node = node->next; + free(tmpNode); + } + freeBlocker(&(w->blocker)); + free(w); + *watchdog = NULL; +} diff --git a/tieto-cpu-tracker/lib/thread/tests/integration/CMakeLists.txt b/tieto-cpu-tracker/lib/thread/tests/integration/CMakeLists.txt new file mode 100644 index 0000000..8f5f069 --- /dev/null +++ b/tieto-cpu-tracker/lib/thread/tests/integration/CMakeLists.txt @@ -0,0 +1,20 @@ +cmake_minimum_required(VERSION 3.5) + +set(C_STANDARD 11) +enable_testing() + +set(SRC_DIR ../../src) + +include_directories(${SRC_DIR}) + +add_executable(ThreadTest Thread.test.c) +target_link_libraries(ThreadTest thread) +add_test(ThreadTest ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/ThreadTest) + +add_executable(QueuedThreadTest QueuedThread.test.c) +target_link_libraries(QueuedThreadTest thread) +add_test(QueuedThreadTest ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/QueuedThreadTest) + +add_executable(WatchdogTest Watchdog.test.c) +target_link_libraries(WatchdogTest thread) +add_test(WatchdogTest ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/WatchdogTest) diff --git a/tieto-cpu-tracker/lib/thread/tests/integration/QueuedThread.test.c b/tieto-cpu-tracker/lib/thread/tests/integration/QueuedThread.test.c new file mode 100644 index 0000000..22eefa2 --- /dev/null +++ b/tieto-cpu-tracker/lib/thread/tests/integration/QueuedThread.test.c @@ -0,0 +1,36 @@ +#include "thread/QueuedThread.h" +#include "thread/Blocker.h" + +#include +#include + +static const int COUNT = 3; +static const int SLEEP_US = 200*1000; + + +void task(void* arg); + +int main() { + QueuedThread* thread = createQueuedThread(); + postQueuedTask(thread, &task, NULL); + postQueuedTask(thread, &task, NULL); + + for (int i = 0; i < COUNT*2+1; i++) { + printf("Main: %d\n", i); + usleep(SLEEP_US); + } + + joinQueuedThread(thread); + freeQueuedThread(&thread); + return 0; +} + +// --- + +void task(void* arg) { + (void)arg; + for (int i = 0; i < COUNT; i++) { + printf("Thread: %d\n", i); + usleep(SLEEP_US); + } +} diff --git a/tieto-cpu-tracker/lib/thread/tests/integration/Thread.test.c b/tieto-cpu-tracker/lib/thread/tests/integration/Thread.test.c new file mode 100644 index 0000000..1558e8e --- /dev/null +++ b/tieto-cpu-tracker/lib/thread/tests/integration/Thread.test.c @@ -0,0 +1,47 @@ +#include "thread/Thread.h" +#include "thread/Blocker.h" + +#include +#include + +static const int COUNT = 3; +static const int SLEEP_US = 200*1000; + + +void* run(void* arg); + +int main() { + Thread* thread = NULL; + BlockerHandle blocker = createBlocker(); + if (blocker == -1) { + printf("Main: Failed to create blocker.\n"); + return 1; + } + + thread = createThread(&run, &blocker); + + for (int i = 0; i < COUNT+1; i++) { + printf("Main: %d\n", i); + usleep(SLEEP_US); + } + notifyBlocker(blocker); + + joinThread(thread); + freeThread(&thread); + freeBlocker(&blocker); + return 0; +} + +// --- + +void* run(void* arg) { + BlockerHandle* blocker = arg; + for (int i = 0; i < COUNT; i++) { + printf("Thread: %d\n", i); + usleep(SLEEP_US); + } + printf("Blocker ID: %d\n", *blocker); + lockBlocker(*blocker); + printf("Thread: Blocker notified, continuing execution.\n"); + return NULL; +} diff --git a/tieto-cpu-tracker/lib/thread/tests/integration/Watchdog.test.c b/tieto-cpu-tracker/lib/thread/tests/integration/Watchdog.test.c new file mode 100644 index 0000000..43013b0 --- /dev/null +++ b/tieto-cpu-tracker/lib/thread/tests/integration/Watchdog.test.c @@ -0,0 +1,96 @@ +#include "thread/Watchdog.h" + +#include +#include +#include +#include + +static const int COUNT = 3; +static const int SLEEP_US = 500*1000; + +static const int WATCHDOG_INTERVAL_MS = 2000; + +static void run1(void* arg); +static void run2(void* arg); +static void run3(void* arg); + +static QueuedThread* t1 = NULL; +static QueuedThread* t2 = NULL; +static QueuedThread* t3 = NULL; + +__attribute__((__format__ (__printf__, 2, 0))) +static void stdLog(const char* tag, const char* format, ...) +{ + va_list args; + printf("%s ", tag); + va_start(args, format); + vprintf(format, args); + va_end(args); +} + +_Noreturn static void onThreadUnresponsive(Thread* thread) +{ + printf("Thread %p is unresponsive, exiting\n", (void*)(thread)); + assert(thread == t3->thread); + _exit(0); // this actually means passing the test +} + +int main() +{ + Watchdog* watchdog = createWatchdog(WATCHDOG_INTERVAL_MS, &onThreadUnresponsive, &stdLog); + + t1 = createWatchedThread(watchdog); + t2 = createWatchedThread(watchdog); + t3 = createWatchedThread(watchdog); + + printf("T1: %p\nT2: %p\nT3: %p\n", + (void*)t1->thread, (void*)t2->thread, (void*)t3->thread); + + startWatchdog(watchdog); + + postQueuedTask(t1, &run1, NULL); + postQueuedTask(t2, &run2, NULL); + postQueuedTask(t3, &run3, NULL); + + for (int i = 0; i < COUNT+1; i++) { + printf("Main: %d\n", i); + usleep(SLEEP_US); + } + + // give watchdog a chance to spot the hanged thread + usleep((WATCHDOG_INTERVAL_MS+500) * 1000); + + stopWatchdog(watchdog); + joinWatchedThreads(watchdog); + freeWatchedThreads(watchdog); + freeWatchdog(&watchdog); + printf("Done.\n"); + return 1; // this actually means failing the test +} + +// --- + +void run1(void* arg) { + (void)arg; + for (int i = 0; i < COUNT; i++) { + printf("Thread 1: %d\n", i); + usleep(SLEEP_US); + } +} + +void run2(void* arg) { + (void)arg; + for (int i = 0; i < COUNT; i++) { + printf("Thread 2: %d\n", i); + usleep(SLEEP_US); + } +} + +_Noreturn void run3(void* arg) { + (void)arg; + for (int i = 0; i < COUNT; i++) { + printf("Thread 3: %d\n", i); + usleep(SLEEP_US); + } + while (1) {} // loop forever +} diff --git a/tieto-cpu-tracker/lib/thread/tests/unit/CMakeLists.txt b/tieto-cpu-tracker/lib/thread/tests/unit/CMakeLists.txt new file mode 100644 index 0000000..f0986e4 --- /dev/null +++ b/tieto-cpu-tracker/lib/thread/tests/unit/CMakeLists.txt @@ -0,0 +1,17 @@ +cmake_minimum_required(VERSION 3.5) + +set(C_STANDARD 11) +enable_testing() + +set(SRC_DIR ../../src) +set(INC_DIR ../../include) + +include_directories(${SRC_DIR} ${INC_DIR}) +include_directories(SYSTEM ${EXTERN_DIR}/greatest) + +add_executable(TaskQueueTest + ${SRC_DIR}/core/TaskQueue.c + TaskQueue.test.c + ) + +add_test(TaskQueueTest ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/TaskQueueTest) diff --git a/tieto-cpu-tracker/lib/thread/tests/unit/TaskQueue.test.c b/tieto-cpu-tracker/lib/thread/tests/unit/TaskQueue.test.c new file mode 100644 index 0000000..1803795 --- /dev/null +++ b/tieto-cpu-tracker/lib/thread/tests/unit/TaskQueue.test.c @@ -0,0 +1,175 @@ +#include "thread/TaskQueue.h" + +#include +#include +#include +#include + +// Testing macros: +#define RUN_TEST(test_function) \ +do {test_setup();test_function();test_teardown();} while (0) +#define CHECK(type, value, operator, expected, message) \ + do {if (!(value operator expected)) { \ + printf("-- Failed: %s\n---- Failed expression: %" #type " %s %" #type "\n", \ + message, value, #operator, expected); ++failCounter;\ + } else {printf("-- PASSED (%s %s %s)\n", \ + #value, #operator, #expected);}} while (0) +static int failCounter = 0; + +// usage: +// CHECK(type, actual, operator, expected, message); +// RUN_TEST(test_function); + +enum { + IDX_RESULT_TASK_1 = 0, + IDX_RESULT_TASK_2, + IDX_RESULT_TASK_ORDER, + IDX_RESULT_TASK_STOP, + IDX_RESULT_BLOCK, + IDX_RESULT_NOTIFY, + IDX_RESULT_COUNT +}; + +static int taskResults[IDX_RESULT_COUNT] = {0}; + +static BlockerHandle blockerHnd = 99; +static TaskQueuePtr globalQueue = NULL; + +void testTask1(void*); +void testTask2(void*); +void testTaskStop(void*); +void testTaskPostItself(void*); +void blockMock(BlockerHandle); +void notifyMock(BlockerHandle); + +static void test_setup(void){} +static void test_teardown(void){} + +static void test_createTaskQueue(void) +{ + TaskQueuePtr queue = createTaskQueue(blockerHnd, &blockMock, ¬ifyMock); + CHECK(p, (void*)queue, !=, NULL, "createTaskQueue should return a valid task queue"); + freeTaskQueue(&queue); +} + +static void test_runQueue(void) +{ + const char* msg = "runQueue should execute enqueued tasks in order with no loop"; + TaskQueuePtr queue = createTaskQueue(blockerHnd, &blockMock, ¬ifyMock); + enqueueTask(queue, &testTask1, NULL); + enqueueTask(queue, &testTask2, NULL); + enqueueTask(queue, &testTaskStop, queue); + runQueue(queue); + + CHECK(i, taskResults[IDX_RESULT_TASK_1] , ==, 1, msg); + CHECK(i, taskResults[IDX_RESULT_TASK_2] , ==, 2, msg); + CHECK(i, taskResults[IDX_RESULT_TASK_ORDER], ==, 3, msg); + CHECK(i, taskResults[IDX_RESULT_TASK_STOP] , ==, 1, msg); + + freeTaskQueue(&queue); +} + +static void test_enqueueNotify(void) +{ + const char* msg = "Enqueueing new task should notify the blocker"; + TaskQueuePtr queue = createTaskQueue(blockerHnd, &blockMock, ¬ifyMock); + memset(taskResults, 0, sizeof(taskResults)); + + enqueueTask(queue, &testTask1, NULL); + enqueueTask(queue, &testTask1, NULL); + + CHECK(i, taskResults[IDX_RESULT_NOTIFY], ==, 2, msg); + freeTaskQueue(&queue); +} + +static void test_stopNotify(void) +{ + const char* msg = "Stopping the queue should notify the blocker"; + TaskQueuePtr queue = createTaskQueue(blockerHnd, &blockMock, ¬ifyMock); + memset(taskResults, 0, sizeof(taskResults)); + + enqueueTask(queue, &testTaskStop, queue); + runQueue(queue); + + CHECK(i, taskResults[IDX_RESULT_NOTIFY], ==, 2, msg); // enqueue + stop + + freeTaskQueue(&queue); +} + +static void test_blockOnEmpty(void) +{ + const char* msg = "Empty queue should lock the blocker"; + TaskQueuePtr queue = createTaskQueue(blockerHnd, &blockMock, ¬ifyMock); + memset(taskResults, 0, sizeof(taskResults)); + + globalQueue = queue; + enqueueTask(queue, &testTask1, NULL); + runQueue(queue); + + CHECK(i, taskResults[IDX_RESULT_BLOCK], ==, 1, msg); + freeTaskQueue(&queue); +} + +static void test_leak(void) +{ + // This is a leak test for valgrind. + // The scenario is that a task enqueueing itself again + // shouldn't cause infinite loop nor memory leak. + + TaskQueuePtr queue = createTaskQueue(blockerHnd, &blockMock, ¬ifyMock); + enqueueTask(queue, &testTaskPostItself, queue); + enqueueTask(queue, &testTaskStop, queue); + runQueue(queue); + freeTaskQueue(&queue); +} + +int main(void) { + RUN_TEST(test_createTaskQueue); + RUN_TEST(test_runQueue); + RUN_TEST(test_enqueueNotify); + RUN_TEST(test_stopNotify); + RUN_TEST(test_blockOnEmpty); + RUN_TEST(test_leak); + return failCounter; +} + +// --- + +void blockMock(BlockerHandle blocker) +{ + (void)blocker; + taskResults[IDX_RESULT_BLOCK] += 1; + if (globalQueue) { + stopQueue(globalQueue); + } +} + +void notifyMock(BlockerHandle blocker) +{ + (void)blocker; + taskResults[IDX_RESULT_NOTIFY] += 1; +} + +void testTask1(void* data) +{ + (void)data; + taskResults[IDX_RESULT_TASK_1] += 1; + taskResults[IDX_RESULT_TASK_ORDER] += 6; +} +void testTask2(void* data) +{ + (void)data; + taskResults[IDX_RESULT_TASK_2] += 2; + taskResults[IDX_RESULT_TASK_ORDER] /= 2; +} + +void testTaskStop(void* data) +{ + taskResults[IDX_RESULT_TASK_STOP] += 1; + stopQueue((TaskQueuePtr)data); +} + +void testTaskPostItself(void* data) +{ + enqueueTask((TaskQueuePtr)data, testTaskPostItself, data); +}