ThreadPool.h
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <stdbool.h>
#include <pthread.h>
struct ThreadJob
{
void* (*CallbackFunction)(void *arg);
void * Arg;
struct ThreadJob * Next;
};
typedef struct
{
int ThreadCount;
int QueueMaxCount;
struct ThreadJob * JobQueueHead;
struct ThreadJob * JobQueueTail;
pthread_t * Threads;
pthread_mutex_t Mutex;
pthread_cond_t QueueEmptyCond;
pthread_cond_t QueueNotEmptyCond;
pthread_cond_t QueueNotFullCond;
int QueueCount;
bool IsQueueClosed;
bool IsPoolClosed;
} ThreadPool;
ThreadPool * ThreadPool_Init(int threadCount, int queueMaxCount);
void ThreadPool_AddJob(ThreadPool * threadPool, void * (*callbackFunction)(void * arg), void * arg);
void ThreadPool_Destroy(ThreadPool * threadPool);
#endif
ThreadPool.c
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include "ThreadPool.h"
static void * _function(void * arg)
{
if(!arg)
{
fprintf(stderr, "[_function()] Argument 'arg' is invalid");
exit(EXIT_FAILURE);
}
ThreadPool * threadPool = (ThreadPool *)arg;
struct ThreadJob *threadJob = NULL;
while (true)
{
pthread_mutex_lock(&(threadPool->Mutex));
while (threadPool->QueueCount == 0 && !threadPool->IsPoolClosed)
pthread_cond_wait(&(threadPool->QueueNotEmptyCond), &(threadPool->Mutex));
if (threadPool->IsPoolClosed)
{
pthread_mutex_unlock(&(threadPool->Mutex));
pthread_exit(NULL);
}
threadPool->QueueCount--;
threadJob = threadPool->JobQueueHead;
if (threadPool->QueueCount == 0)
{
threadPool->JobQueueHead = NULL;
threadPool->JobQueueTail = NULL;
}
else
threadPool->JobQueueHead = threadJob->Next;
if (threadPool->QueueCount == 0)
pthread_cond_signal(&(threadPool->QueueEmptyCond));
if (threadPool->QueueCount == threadPool->QueueMaxCount - 1)
pthread_cond_broadcast(&(threadPool->QueueNotFullCond));
pthread_mutex_unlock(&(threadPool->Mutex));
(*(threadJob->CallbackFunction))(threadJob->Arg);
free(threadJob);
}
}
ThreadPool * ThreadPool_Init(int threadCount, int queueMaxCount)
{
ThreadPool * threadPool = NULL;
threadPool = malloc(sizeof(ThreadPool));
if (!threadPool)
{
perror("[ThreadPool_Init()] malloc");
exit(EXIT_FAILURE);
}
threadPool->ThreadCount = threadCount;
threadPool->QueueMaxCount = queueMaxCount;
threadPool->QueueCount = 0;
threadPool->JobQueueHead = NULL;
threadPool->JobQueueTail = NULL;
if (pthread_mutex_init(&(threadPool->Mutex), NULL))
{
perror("[ThreadPool_Init()] pthread_mutex_init");
exit(EXIT_FAILURE);
}
if (pthread_cond_init(&(threadPool->QueueEmptyCond), NULL))
{
perror("[ThreadPool_Init()] pthread_mutex_init");
exit(EXIT_FAILURE);
}
if (pthread_cond_init(&(threadPool->QueueNotEmptyCond), NULL))
{
perror("[ThreadPool_Init()] pthread_cond_init");
exit(EXIT_FAILURE);
}
if (pthread_cond_init(&(threadPool->QueueNotFullCond), NULL))
{
perror("[ThreadPool_Init()] pthread_cond_init");
exit(EXIT_FAILURE);
}
threadPool->Threads = malloc(sizeof(pthread_t) * threadCount);
if (!threadPool->Threads)
{
perror("[ThreadPool_Init()] malloc");
exit(EXIT_FAILURE);
}
threadPool->IsQueueClosed = false;
threadPool->IsPoolClosed = false;
for (int i = 0; i < threadPool->ThreadCount; ++i)
pthread_create(&(threadPool->Threads[i]), NULL, _function, threadPool);
return threadPool;
}
void ThreadPool_AddJob(ThreadPool * threadPool, void * (*callbackFunction)(void * arg), void * arg)
{
if(!threadPool)
{
fprintf(stderr, "[ThreadPool_AddJob()] Argument 'threadPool' is invalid");
exit(EXIT_FAILURE);
}
if(!callbackFunction)
{
fprintf(stderr, "[ThreadPool_AddJob()] Argument 'callbackFunction' is invalid");
exit(EXIT_FAILURE);
}
pthread_mutex_lock(&(threadPool->Mutex));
while (threadPool->QueueCount == threadPool->QueueMaxCount
&& !(threadPool->IsQueueClosed || threadPool->IsPoolClosed))
pthread_cond_wait(&(threadPool->QueueNotFullCond), &(threadPool->Mutex));
if (threadPool->IsQueueClosed || threadPool->IsPoolClosed)
{
fprintf(stderr, "[ThreadPool_AddJob()] ThreadPool or Queue has been closed");
exit(EXIT_FAILURE);
}
struct ThreadJob * threadJob = malloc(sizeof(struct ThreadJob));
if (!threadJob)
{
perror("[ThreadPool_AddJob()] malloc");
exit(EXIT_FAILURE);
}
threadJob->CallbackFunction = callbackFunction;
threadJob->Arg = arg;
threadJob->Next = NULL;
if (threadPool->JobQueueHead == NULL)
{
threadPool->JobQueueHead = threadJob;
threadPool->JobQueueTail = threadJob;
pthread_cond_broadcast(&(threadPool->QueueNotEmptyCond));
}
else
{
threadPool->JobQueueTail->Next = threadJob;
threadPool->JobQueueTail = threadJob;
}
threadPool->QueueCount++;
pthread_mutex_unlock(&(threadPool->Mutex));
}
void ThreadPool_Destroy(ThreadPool * threadPool)
{
if(!threadPool)
{
fprintf(stderr, "[ThreadPool_AddJob()] Argument 'threadPool' is invalid");
exit(EXIT_FAILURE);
}
pthread_mutex_lock(&(threadPool->Mutex));
if (threadPool->IsQueueClosed || threadPool->IsPoolClosed)
{
fprintf(stderr, "[ThreadPool_Destroy()] ThreadPool or Queue has been closed");
exit(EXIT_FAILURE);
}
threadPool->IsQueueClosed = true;
while (threadPool->QueueCount != 0)
pthread_cond_wait(&(threadPool->QueueEmptyCond), &(threadPool->Mutex));
threadPool->IsPoolClosed = true;
pthread_mutex_unlock(&(threadPool->Mutex));
pthread_cond_broadcast(&(threadPool->QueueNotEmptyCond));
pthread_cond_broadcast(&(threadPool->QueueNotFullCond));
for (int i = 0; i < threadPool->ThreadCount; ++i)
pthread_join(threadPool->Threads[i], NULL);
pthread_mutex_destroy(&(threadPool->Mutex));
pthread_cond_destroy(&(threadPool->QueueEmptyCond));
pthread_cond_destroy(&(threadPool->QueueNotEmptyCond));
pthread_cond_destroy(&(threadPool->QueueNotFullCond));
free(threadPool->Threads);
struct ThreadJob * p = NULL;
while (threadPool->JobQueueHead != NULL)
{
p = threadPool->JobQueueHead;
threadPool->JobQueueHead = p->Next;
free(p);
}
free(threadPool);
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)