#include "thread/TaskQueue.h" #include inline static void runUntilEmpty(TaskQueuePtr queue) { TaskEntry* current = NULL; for (current = dequeueTask(queue); current != NULL; current = dequeueTask(queue)) { if (queue->running) { // any task could've changed it current->task(current->data); } freeTask(¤t); } } TaskQueuePtr createTaskQueue(BlockerHandle blocker, LockBlockerFn lock, NotifyBlockerFn notify) { TaskQueuePtr queue = calloc(1, sizeof(TaskQueue)); queue->front = NULL; queue->back = NULL; queue->blocker = blocker; queue->lock = lock; queue->notify = notify; queue->running = false; return queue; } void enqueueTask(TaskQueuePtr queue, Task task, void* data) { TaskEntry* newEntry = (TaskEntry*)calloc(1, sizeof(TaskEntry)); newEntry->task = task; newEntry->data = data; newEntry->next = NULL; if (queue->front == NULL) { queue->front = newEntry; queue->back = newEntry; } else { queue->back->next = newEntry; queue->back = newEntry; } queue->notify(queue->blocker); } TaskEntry* dequeueTask(TaskQueuePtr queue) { TaskEntry* current = NULL; if (queue->front == NULL) { return current; } current = queue->front; queue->front = current->next; if (queue->front == NULL) { queue->back = NULL; } return current; } void runQueue(TaskQueuePtr queue) { queue->running = true; while (queue->running) { runUntilEmpty(queue); if (queue->running) { queue->lock(queue->blocker); } } // execute remaining tasks runUntilEmpty(queue); } void stopQueue(TaskQueuePtr queue) { queue->running = false; queue->notify(queue->blocker); } void freeTask(TaskEntry** entry) { free(*entry); *entry = NULL; } void freeTaskQueue(TaskQueuePtr* queue) { TaskEntry* task = dequeueTask(*queue); while (task != NULL) { freeTask(&task); task = dequeueTask(*queue); } free(*queue); *queue = NULL; }