C 中的多线程管道具有错误的管道实现

2023-12-15

我正在尝试创建一个多线程管道,将函数存储在多个线程中,并使用管道与每个线程和函数进行通信。当我运行我的程序时,它一遍又一遍地运行相同的函数,而不是单独的函数,我认为我的管道有问题,但我不太确定我到底做错了什么?

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <stdbool.h>
int num_ints;
typedef void (*Function)(void* input, void* output);

typedef struct Pipeline {
    Function* functions;
    int numStages;
} Pipeline;
 
Pipeline* new_Pipeline() {
    Pipeline *this = malloc(sizeof(Pipeline));
    this->functions = NULL;
    this->numStages = 0;
    printf("created the pipeline\n");
    return this;   
}

bool Pipeline_add(Pipeline* this, Function f) {
    //reallocating memory to add a stage to the functions array
    this->functions = realloc(this->functions, (this->numStages +1) * sizeof(Function));
    if (this->functions == NULL) {
        return false;
    } 
    else {
        this->functions[this->numStages] = f;
        this->numStages++;
        printf("added a stage\n");
        return true;
    }
    
}

typedef struct {
    Function function;
    int inputPipe;
    int outputPipe;
} thread_args;

void* thread_func(void* arg) {
    thread_args *data = (thread_args*) arg;

    //get the input and output pipes from the args parameter
    int inPipe = data->inputPipe;
    int outPipe = data->outputPipe;
    data->function((void*)&inPipe,(void*)&outPipe);
    return NULL;
}

void Pipeline_execute(Pipeline* this) {
    //create threads 
    pthread_t threads[this->numStages];
    thread_args args[this->numStages];
    for (int i = 0; i < this->numStages; i++) {
         //creating the pipes
        int fd[2]; 
        pipe(fd);
        //creating input and output pipes for each stage
        args[i].function = this->functions[i];
        args[i].inputPipe = fd[0];
        args[i].outputPipe = fd[1];
        if (pthread_create(&threads[i], NULL, thread_func, &args[i]) != 0) {
            printf("created a thread\n");
            perror("pthread_create\n");
        }
        if (i == this->numStages -1) {
            close(fd[0]);
        }
    }
    //waiting for threads to finish
    for (int i = 0; i < this->numStages; i++) {
        if (pthread_join(threads[i], NULL) != 0) {
            perror("pthread_join\n");
            exit(1);
        }
    }
    //closing pipes
    for (int i = 0; i < this->numStages -1; i++) {
        
    }
    
}


void Pipeline_free(Pipeline* this) {
    free(this->functions);
    free(this);
}


bool Pipeline_send(void* channel, void* buffer, size_t size) {
    if ((write(*(int*)channel, buffer, size)) != -1) {
        return true;
    } else {
        return false;
    }
    
}


bool Pipeline_receive(void* channel, void* buffer, size_t size) {
    if ((read(*(int*)channel, buffer, size)) != -1) {
        return true; 
    } else {
        return false;
    }
}
//an application created to help test the implementation of pipes.

static void generateInts(void* input, void* output) {
    printf("generateInts: thread %p\n", (void*) pthread_self());
    for (int i = 1; i <= num_ints; i++) {
        if (!Pipeline_send(output, (void*) &i, sizeof(int))) exit(EXIT_FAILURE);
    }
}


static void squareInts(void* input, void* output) {
    printf("squareInts: thread %p\n", (void*) pthread_self());
    for (int i = 1; i <= num_ints; i++) {
        int number;
        if (!Pipeline_receive(input, (void*) &number, sizeof(int))) exit(EXIT_FAILURE);
        int result = number * number;
        if (!Pipeline_send(output, (void*) &result, sizeof(int))) exit(EXIT_FAILURE);
    }
}


static void sumIntsAndPrint(void* input, void* output) {
    printf("sumIntsAndPrint: thread %p\n", (void*) pthread_self());
    int number = 0;
    int result = 0;
    for (int i = 1; i <= num_ints; i++) {
        if (!Pipeline_receive(input, (void*) &number, sizeof(int))) exit (EXIT_FAILURE);
        result += number;
    }
    printf("sumIntsAndPrint: result = %i\n", result);
}

static void cleanupExit(Pipeline *p) {
    if (p) {
        Pipeline_free(p);
    }
    exit(EXIT_FAILURE);
}


int main() {
    scanf("%d", &num_ints);
    printf("Setting up pipeline to calculate the sum of squares of integers 1 to %i.\n", num_ints);

    Pipeline *p = new_Pipeline();

    if (p == NULL) cleanupExit(p);
    if (!Pipeline_add(p, generateInts)) cleanupExit(p);
    if (!Pipeline_add(p, squareInts)) cleanupExit(p);
    if (!Pipeline_add(p, sumIntsAndPrint)) cleanupExit(p);
    Pipeline_execute(p);

    Pipeline_free(p);
    return 0;
}

我期望它在不同的线程上运行每个函数,我使用另一段代码来调试它,它创建了三个不同的线程,但每个线程的函数都是相同的。 当我运行提供的应用程序来检查管道实现时,它将此返回给我

Setting up pipeline to calculate the sum of squares of integers 1 to 10.
created the pipeline
added a stage
added a stage
added a stage
generateInts: thread 0x5677640
squareInts: thread 0x5e78640
sumIntsAndPrint: thread 0x6679640
==162384== 
==162384== HEAP SUMMARY:
==162384==     in use at exit: 584 bytes in 4 blocks
==162384==   total heap usage: 9 allocs, 5 frees, 10,096 bytes allocated
==162384==
==162384== LEAK SUMMARY:
==162384==    definitely lost: 0 bytes in 0 blocks
==162384==    indirectly lost: 0 bytes in 0 blocks
==162384==      possibly lost: 544 bytes in 2 blocks
==162384==    still reachable: 40 bytes in 2 blocks

我更新了代码


In Pipeline_execute ...

  1. 您正在使用fd来自pipe调用错误。

  2. 您正在设置输入and每个线程的输出到same管道。因此,它会写入自身(创建无限数据循环?)。

  3. 您必须“交错”管道,以便阶段 0 的输出进入阶段 1 的输入(例如)。

此外,您的线程函数必须在从目标函数返回后关闭管道单元。


这是重构的[工作?]代码。特别要注意以下方面的变化Pipeline_execute。此外,这些变化thread_func.

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <stdbool.h>

#define CLOSEME(_fd) \
    do { \
        if (_fd < 0) \
            break; \
        close(_fd); \
        _fd = -1; \
    } while (0)

int num_ints;
typedef void (*Function) (void *input, void *output);

typedef struct Pipeline {
    Function *functions;
    int numStages;
} Pipeline;

Pipeline *
new_Pipeline()
{
    Pipeline *this = malloc(sizeof(Pipeline));

    this->functions = NULL;
    this->numStages = 0;
    printf("created the pipeline\n");
    return this;
}

bool
Pipeline_add(Pipeline *this, Function f)
{
    // reallocating memory to add a stage to the functions array
    this->functions = realloc(this->functions, (this->numStages + 1) * sizeof(Function));
    if (this->functions == NULL) {
        return false;
    }
    else {
        this->functions[this->numStages] = f;
        this->numStages++;
        printf("added a stage\n");
        return true;
    }

}

typedef struct {
    Function function;
    int inputPipe;
    int outputPipe;
} thread_args;

void *
thread_func(void *arg)
{
    thread_args *data = (thread_args *) arg;

    // get the input and output pipes from the args parameter
    int inPipe = data->inputPipe;
    int outPipe = data->outputPipe;

    data->function((void *) &inPipe, (void *) &outPipe);

    CLOSEME(data->inputPipe);
    CLOSEME(data->outputPipe);

    return NULL;
}

void
Pipeline_execute(Pipeline *this)
{
    // create threads
    pthread_t threads[this->numStages];
    thread_args args[this->numStages];

    int fd[2] = { -1, -1 };

    for (int i = 0; i < this->numStages; i++) {
        // use input side from _prior_ stage for our input
        args[i].inputPipe = fd[0];

        // last stage has _no_ output
        if (i == this->numStages - 1) {
            fd[0] = -1;
            fd[1] = -1;
        }
        else
            pipe(fd);

        // set output for _this_ stage from the pipe output
        args[i].outputPipe = fd[1];

        // creating input and output pipes for each stage
        args[i].function = this->functions[i];
        if (pthread_create(&threads[i], NULL, thread_func, &args[i]) != 0) {
            printf("created a thread\n");
            perror("pthread_create\n");
        }
    }

    // waiting for threads to finish
    for (int i = 0; i < this->numStages; i++) {
        if (pthread_join(threads[i], NULL) != 0) {
            perror("pthread_join\n");
            exit(1);
        }
    }

    // closing pipes
    // Now done in thread_func
    for (int i = 0; i < this->numStages - 1; i++) {
    }
}

void
Pipeline_free(Pipeline *this)
{
    free(this->functions);
    free(this);
}

bool
Pipeline_send(void *channel, void *buffer, size_t size)
{
    if ((write(*(int *) channel, buffer, size)) != -1) {
        return true;
    }
    else {
        return false;
    }

}

bool
Pipeline_receive(void *channel, void *buffer, size_t size)
{
    if ((read(*(int *) channel, buffer, size)) != -1) {
        return true;
    }
    else {
        return false;
    }
}

//an application created to help test the implementation of pipes.

static void
generateInts(void *input, void *output)
{
    printf("generateInts: thread %p\n", (void *) pthread_self());
    for (int i = 1; i <= num_ints; i++) {
        if (!Pipeline_send(output, (void *) &i, sizeof(int)))
            exit(EXIT_FAILURE);
    }
}

static void
squareInts(void *input, void *output)
{
    printf("squareInts: thread %p\n", (void *) pthread_self());
    for (int i = 1; i <= num_ints; i++) {
        int number;

        if (!Pipeline_receive(input, (void *) &number, sizeof(int)))
            exit(EXIT_FAILURE);
        int result = number * number;

        if (!Pipeline_send(output, (void *) &result, sizeof(int)))
            exit(EXIT_FAILURE);
    }
}

static void
sumIntsAndPrint(void *input, void *output)
{
    printf("sumIntsAndPrint: thread %p\n", (void *) pthread_self());
    int number = 0;
    int result = 0;

    for (int i = 1; i <= num_ints; i++) {
        if (!Pipeline_receive(input, (void *) &number, sizeof(int)))
            exit(EXIT_FAILURE);
        result += number;
    }
    printf("sumIntsAndPrint: result = %i\n", result);
}

static void
cleanupExit(Pipeline * p)
{
    if (p) {
        Pipeline_free(p);
    }
    exit(EXIT_FAILURE);
}

int
main()
{
    printf("Enter number of ints:\n");
    scanf("%d", &num_ints);
    printf("Setting up pipeline to calculate the sum of squares of integers 1 to %i.\n", num_ints);

    Pipeline *p = new_Pipeline();

    if (p == NULL)
        cleanupExit(p);
    if (!Pipeline_add(p, generateInts))
        cleanupExit(p);
    if (!Pipeline_add(p, squareInts))
        cleanupExit(p);
    if (!Pipeline_add(p, sumIntsAndPrint))
        cleanupExit(p);
    Pipeline_execute(p);

    Pipeline_free(p);
    return 0;
}

这是程序输出:

Enter number of ints:
7
Setting up pipeline to calculate the sum of squares of integers 1 to 7.
created the pipeline
added a stage
added a stage
added a stage
generateInts: thread 0x7fc5c10a4700
squareInts: thread 0x7fc5c08a3700
sumIntsAndPrint: thread 0x7fc5c00a2700
sumIntsAndPrint: result = 140

这是基于我的回答:fd 泄漏,自定义 Shell它正在使用fork代替pthread_create但问题是相似的。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

C 中的多线程管道具有错误的管道实现 的相关文章

  • Java 中的 TreeSet 与 C#.net 的等效项

    我有 Java 代码 其中包含TreeSet 我想将代码转换为 C 我可以使用哪个等效集合 如果没有 请提出替代方案 那将是系统 集合 通用 SortedSet
  • 从变量使用 OLE DB 源命令的 EzAPI 等效项是什么?

    tl dr 使用 来自变量的 SQL 命令 数据访问模式的 OLE DB 源并分配变量的 EzAPI 代码是什么 Preamble 每月一次 我们需要使用生产数据的子集刷新我们的公共测试站点 我们已确定 根据我们的需求 SSIS 解决方案最
  • 在 MS word .docs 中插入 MathMl 、 Tex 或 LaTex 方程?

    我一直在寻找 C 中 Net 的一些 dll 用于在 MS Word 中插入任何 MathMl Tex 或 LaTex 方程 我尝试了一些类似 DocX 的工具 但它们失败了 你知道吗 或任何提示如何做到这一点 谢谢 Word 2007 可
  • 创建动态对象

    如何动态创建对象 string columnNames EmpName EmpID PhoneNo List
  • 如何在 VS 2013 的立即窗口中执行 LINQ 和/或 foreach?

    在调试过程中探测当前状态时 立即窗口是非常有用的工具 我了解到 通过使用问号 人们可以在那里做更多的事情 如图所示在这篇文章中 https stackoverflow com questions 32934635 execute metho
  • 检查文件是真实文件还是符号链接

    有没有办法使用 C 来判断文件是真实文件还是符号链接 我已经挖过了MSDN W32 文档 https learn microsoft com en us windows win32 fileio file management functi
  • 未构建 csproj 时抑制 AfterBuild 目标

    我在 MSBuild 中有一个构建后目标来复制一些构建输出 这是 linkedin 作为对AfterBuild目标 暴露于Microsoft CSharp targets
  • 要实现 XML 可序列化,从 ICollection 继承的类型必须具有 Add 的实现

    我有来自现有项目的 CSLA 1 x 框架 对象 我试图在新的 Net 4 0 项目中使用它 这些对象正在生产中使用 如果没有 2 组对象 我确实无法将它们转换为 2 x 或 EF 在我的 c webservice 中 当我尝试运行它时 我
  • GridView必须添加到表单标签中才能渲染

    TextWriter tr new StringWriter HtmlTextWriter writer new HtmlTextWriter tr HtmlForm form new HtmlForm form Controls Add
  • 当 MSB 位等于 0 时如何以十六进制格式打印它们

    我需要使用打印变量HEX格式 问题是当我的变量很小时 MSB 等于 0 因此不会打印它们 ex uint16 t var 10 0x000A h gt 我需要打印 000A 但无论我做什么它总是打印 A 我怎样才能让它发挥作用 您可以添加前
  • 如何将 Activator.CreateInstance 与字符串一起使用?

    在我的反射代码中 我的通用代码部分遇到了问题 特别是当我使用字符串时 var oVal object Test var oType oVal GetType var sz Activator CreateInstance oType oVa
  • Time 方法在另一个线程中执行并在超时时中止

    您好 我正在尝试异步运行方法 以便计算持续时间并在超过超时时取消该方法 我尝试使用异步和等待来实现这一点 但没有运气 也许我过度设计了这个 任何输入都会受到赞赏 应该注意的是 我无法更改接口 TheirInterface 因此得名 到目前为
  • 如何在调试时轻松查看事件订阅数量?

    在调试时 我可以查看一下textBox1 TextChanged查看事件订阅数量 如果是 那么我该如何钻取它 我需要知道在给定时间有多少订阅进行调试 因为看起来一个事件被多次触发 但我怀疑这个错误确实是因为textBox1 TextChan
  • 计算距离早上 8 点还有多少小时

    我知道如何计算两个日期之间的差异 但如何计算给定日期与下一个上午 8 点之间的时间 var now DateTime Now var tomorrow8am now AddDays 1 Date AddHours 8 double tota
  • 在 C++ 泛型编程中处理 void 赋值

    我有 C 代码 它包装任意 lambda 并返回 lambda 的结果 template
  • 从多页 tiff 中提取帧 - C#

    有一个多页 tiff 我想从此 Tiff 文件中提取第 n 页 帧 n 并保存它 如果我的多页 tiff 有 3 帧 在我提取一页 帧后 我想留下 1 张图像有 2 页 帧 并且 1 张图像只有 1 页 帧 下面是一些代码 用于将多帧 ti
  • 解析 SWIG 接口文件的结构属性

    这是我不久前问过的问题的延续 为通过参数返回的函数创建类型映射 https stackoverflow com questions 12793973 create a typemap for a function that returns
  • ASP .NET Core IIS 托管用户身份名称为空且 IsAuthenticated=false

    我在 IIS 上运行 ASP NET Core dll 使用 AspNetCoreModule 使用以前的 ASP NET 我可以通过以下方式获取用户身份名称 HttpContext Current User Identity Name 因
  • 使用抽象类作为模板类型

    我对c 还是很陌生 来自java 我有一个 stl 类型列表Actor When Actor仅包含 真实 方法就没有问题 我现在想将这个类扩展到几个类 并且需要将一些方法更改为抽象的 因为它们不再具有具体的意义 正如我 从文档中 预期的那样
  • qt 如何知道按钮被点击?

    我正在尝试编写一个程序 用声音进行一些操作 我的问题是我有 3 个播放按钮和 3 个标签 我希望无论我单击 播放 按钮 都应该播放按钮附近标签中名称的声音 我有一个没有任何参数的播放插槽 那么 如何分别连接到每个播放按钮和每个标签呢 实际上

随机推荐