如何通过pthreads管理两个或多个消费者?

2024-05-07

我有一个正在寻求解决的通用问题,即从标准输入或常规文件流发送到应用程序的二进制数据块,应用程序又将二进制数据转换为文本。使用线程,我想在将文本传输到下一个应用程序之前对其进行处理,该应用程序会进一步修改该文本,依此类推。

作为一个简单的测试用例,我想通过提取压缩数据gunzip。具体来说,我正在考虑使用gunzip -c -提取通过其发送给它的二进制数据块(重新分配)stdin文件描述符,然后从其中提取文本块(重新分配)stdout文件描述符。然后我可以将这些文本块打印到真实的stdout or stderr(或者稍后做其他事情)。

(我意识到我可以做gzip基于命令行的压缩和提取。我的目标是使用此测试用例来学习如何在线程之间正确传递二进制和文本数据的通用块,这些线程要么通过二进制文件运行该数据,要么进一步处理它。)

就我的测试程序而言,我设置了三个pthread_t线程:

  • produce_gzip_chunk_thread
  • consume_gzip_chunk_thread
  • consume_gunzip_chunk_thread

我向每个线程传递一个名为的共享数据实例thread_data,其中包含一个线程锁、两个条件以及一些缓冲区和计数器变量。我还包括一组文件描述符gunzip进程打开于popen3():

typedef struct pthread_data pthread_data_t;
typedef struct popen3_desc popen3_desc_t;

struct pthread_data {
    pthread_mutex_t in_lock;
    pthread_cond_t in_cond;
    pthread_cond_t out_cond;
    unsigned char in_buf[BUF_LENGTH_VALUE];
    size_t n_in_bytes;
    size_t n_in_bytes_written_to_gunzip;
    size_t n_out_bytes_read_from_gunzip;
    FILE *in_file_ptr;
    boolean in_eof;
    char in_line[LINE_LENGTH_VALUE];
    popen3_desc_t *gunzip_ptr;
};

struct popen3_desc {
    int in;
    int out;
    int err;
};

The produce_gzip_chunk_thread读取 1024 字节的块gzip-来自名为的常规文件的压缩字节foo.gz.

这些字节被写入unsigned char称为缓冲区in_buf,这是我传递给每个线程的共享数据结构的一部分:

void * produce_gzip_chunk(void *t_data)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> produce_gzip_chunk()\n");
#endif

    pthread_data_t *d = (pthread_data_t *)t_data;
    unsigned char in_buf[BUF_LENGTH_VALUE];
    size_t n_in_bytes = 0;

    d->in_eof = kFalse;

    pthread_mutex_lock(&d->in_lock);
    while(kTrue) {
        n_in_bytes = fread(in_buf, sizeof(in_buf[0]), sizeof(in_buf), d->in_file_ptr);
        if (n_in_bytes > 0) {
            while (d->n_in_bytes != 0 || d->n_out_bytes_read_from_gunzip != 0)
                pthread_cond_wait(&d->in_cond, &d->in_lock);
            memcpy(d->in_buf, in_buf, n_in_bytes);
            d->n_in_bytes = n_in_bytes;
#ifdef DEBUG
            fprintf(stderr, "Debug: ######## [%07zu] produced chunk\n", d->n_in_bytes);
#endif
            pthread_cond_signal(&d->in_cond);
        }
        else if (feof(d->in_file_ptr) || ferror(d->in_file_ptr))
            break;
    } 
    d->in_eof = kTrue;
    pthread_mutex_unlock(&d->in_lock);
    pthread_cond_signal(&d->in_cond);

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> produce_gzip_chunk()\n");
#endif
    return NULL;
}

一旦有正数的字节存储在n_bytes——也就是说,我们从输入中提取了数据gzip需要处理的存档gunzip— 这会触发允许第二个线程的条件consume_gzip_chunk_thread操作:

void * consume_gzip_chunk(void *t_data)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> consume_gzip_chunk()\n");
#endif

    pthread_data_t *d = (pthread_data_t *)t_data;
    long n_in_bytes_written_to_gunzip;

    pthread_mutex_lock(&d->in_lock);
    while(kTrue) {
        while (d->n_in_bytes == 0 && !d->in_eof)
            pthread_cond_wait(&d->in_cond, &d->in_lock);
        if (d->n_in_bytes) {
#ifdef DEBUG
            fprintf(stderr, "Debug: ........ [%07zu] processing chunk\n", d->n_in_bytes);
#endif
            if (!d->gunzip_ptr) {
#ifdef DEBUG
                fprintf(stderr, "Debug: * setting up gunzip ptr\n");
#endif
                d->gunzip_ptr = malloc(sizeof(popen3_desc_t));
                if (!d->gunzip_ptr) {
                    fprintf(stderr, "Error: Could not create gunzip file handle struct\n");
                    exit(EXIT_FAILURE);
                }

                popen3("gunzip -c -", 
                       &(d->gunzip_ptr->in), 
                       &(d->gunzip_ptr->out), 
                       &(d->gunzip_ptr->err), 
                       kTrue, 
                       kTrue);
                memset(d->in_line, 0, LINE_LENGTH_VALUE);
            }
            n_in_bytes_written_to_gunzip = (long) write(d->gunzip_ptr->in, d->in_buf, d->n_in_bytes);
#ifdef DEBUG
            fprintf(stderr, "Debug: ................ wrote [%07ld] bytes into the gunzip process\n", n_in_bytes_written_to_gunzip);
#endif
            if (n_in_bytes_written_to_gunzip > 0)
                d->n_in_bytes_written_to_gunzip = n_in_bytes_written_to_gunzip;

            d->n_in_bytes = 0;
            pthread_cond_signal(&d->out_cond);
        }
        if (d->in_eof) 
            break;
    } 
    pthread_mutex_unlock(&d->in_lock);

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> consume_gzip_chunk()\n");
#endif
    return NULL;
}

食用时gzip数据块,我们使用write发送功能n_bytes of in_buf to the gunzip进程的输入文件描述符。最后,我们发送另一个线程信号,但这一次是为了out_cond,以帮助重新唤醒consume_gunzip_chunk_thread,它读取自gunzip的输出来做更多的工作:

void * consume_gunzip_chunk(void *t_data)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> consume_gunzip_chunk()\n");
#endif

    pthread_data_t *d = (pthread_data_t *)t_data;
    long n_out_bytes_read_from_gunzip;

    pthread_mutex_lock(&d->in_lock);
    while(kTrue) {
        while (d->n_in_bytes_written_to_gunzip == 0) {
            pthread_cond_wait(&d->out_cond, &d->in_lock);
        }
        if (d->n_in_bytes_written_to_gunzip) {
            sleep(1);
            n_out_bytes_read_from_gunzip = read(d->gunzip_ptr->out, d->in_line, LINE_LENGTH_VALUE);
#ifdef DEBUG
            fprintf(stderr, "Debug: ------------------------ read [%07ld] bytes out from the gunzip process\n", n_out_bytes_read_from_gunzip);
            fprintf(stderr, "Debug: ------------------------ gunzip output chunk:\n[%s]\n", d->in_line);
#endif
            memset(d->in_line, 0, strlen(d->in_line));
            if (n_out_bytes_read_from_gunzip > 0)
                d->n_out_bytes_read_from_gunzip = n_out_bytes_read_from_gunzip;
            d->n_in_bytes_written_to_gunzip = 0;
            pthread_cond_signal(&d->in_cond);
        }
        if (d->in_eof && (d->n_in_bytes_written_to_gunzip == 0))
            break;
    }
    pthread_mutex_unlock(&d->in_lock);

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> consume_gunzip_chunk()\n");
#endif
    return NULL;
}

这试图read任何可用字节gunzip进程的输出文件描述符。出于调试目的,我只想将它们打印到stderr目前。

我面临的问题是我需要添加一个sleep(1)中的声明consume_gunzip_chunk,在做之前read,为了让事情正常工作。

没有这个sleep(1)声明中,我的测试程序通常不会输出任何内容——除非每 8-10 次尝试一次,当压缩数据被正确提取时。

Question- 我对条件的安排做错了什么,使得sleep(1)需要致电才能进行gzip- 提取工作正常吗?在生产场景中,处理更大的输入文件时,每 1kB 强制等待一秒似乎是一个坏主意。


为了重现完整的源代码,这里有两个相关文件。这是标题:

/*
 * convert.h
 */

#ifndef CONVERT_H
#define CONVERT_H

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <getopt.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>

#define CB_VERSION "1.0"
#define LINE_LENGTH_VALUE 65536
#define BUF_LENGTH_VALUE 1024
#define POPEN3_READ 0
#define POPEN3_WRITE 1

typedef int boolean;
extern const boolean kTrue;
extern const boolean kFalse;
const boolean kTrue = 1;
const boolean kFalse = 0;

typedef enum {
    kGzip,
    kUnknown
} format_t;

typedef struct pthread_data pthread_data_t;
typedef struct popen3_desc popen3_desc_t;

struct pthread_data {
    pthread_mutex_t in_lock;
    pthread_cond_t in_cond;
    pthread_cond_t out_cond;
    unsigned char in_buf[BUF_LENGTH_VALUE];
    size_t n_in_bytes;
    size_t n_in_bytes_written_to_gunzip;
    size_t n_out_bytes_read_from_gunzip;
    boolean in_eof;
    FILE *in_file_ptr;
    popen3_desc_t *gunzip_ptr;
    char in_line[LINE_LENGTH_VALUE];
};

struct popen3_desc {
    int in;
    int out;
    int err;
};

static const char *name = "convert";
static const char *version = CB_VERSION;
static const char *authors = "Alex Reynolds";
static const char *usage = "\n" \
    "Usage: convert --input-format=str <input-file>\n" \
    "  Process Flags:\n\n" \
    "  --input-format=str            | -f str  Input format (str = [ gzip ]; required)\n" \
    "  --help                        | -h      Show this usage message\n";

static struct convert_globals_t {
    char *input_format_str;
    format_t input_format;
    char **filenames;
    int num_filenames;
} convert_globals;

static struct option convert_client_long_options[] = {
    { "input-format",           required_argument,  NULL,   'f' },
    { "help",               no_argument,        NULL,   'h' },
    { NULL,             no_argument,        NULL,    0  }
}; 

static const char *convert_client_opt_string = "f:h?";

void * consume_gunzip_chunk        (void *t_data);
void * consume_gzip_chunk          (void *t_data);
void * produce_gzip_chunk          (void *t_data);
FILE * new_file_ptr                (const char *in_fn);
void   delete_file_ptr             (FILE **file_ptr);
pid_t  popen3                      (const char *command, 
                                    int *in_desc, 
                                    int *out_desc, 
                                    int *err_desc, 
                                    boolean nonblock_in, 
                                    boolean nonblock_outerr);
off_t  fsize                       (const char *fn);
void   initialize_globals          ();
void   parse_command_line_options  (int argc, 
                                    char **argv);
void   print_usage                 (FILE *stream);

#endif

这是实现:

/*
 * convert.c
 */

#include "convert.h"

int main(int argc, char **argv)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> main()\n");
#endif

    pthread_t produce_gzip_chunk_thread = NULL;
    pthread_t consume_gzip_chunk_thread = NULL;
    pthread_t consume_gunzip_chunk_thread = NULL;
    pthread_data_t *thread_data = NULL;

    parse_command_line_options(argc, argv);

    /* initialize thread data */
    thread_data = malloc(sizeof(pthread_data_t));
    thread_data->n_in_bytes = 0;
    thread_data->n_in_bytes_written_to_gunzip = 0;
    thread_data->n_out_bytes_read_from_gunzip = 0;
    thread_data->in_eof = kFalse;
    thread_data->in_file_ptr = new_file_ptr(convert_globals.filenames[0]);
    pthread_mutex_init(&(thread_data->in_lock), NULL);
    pthread_cond_init(&(thread_data->in_cond), NULL);
    pthread_cond_init(&(thread_data->out_cond), NULL);

    /* parse input */
    if (convert_globals.input_format == kGzip) 
        {
            if (pthread_create(&produce_gzip_chunk_thread, NULL, produce_gzip_chunk, (void *) thread_data) != 0) {
                fprintf(stderr, "Error: Could not create gzip chunk production thread\n");
                return EXIT_FAILURE;
            }
            if (pthread_create(&consume_gzip_chunk_thread, NULL, consume_gzip_chunk, (void *) thread_data) != 0) {
                fprintf(stderr, "Error: Could not create gzip chunk consumption thread\n");            
                return EXIT_FAILURE;
            }
            if (pthread_create(&consume_gunzip_chunk_thread, NULL, consume_gunzip_chunk, (void *) thread_data) != 0) {
                fprintf(stderr, "Error: Could not create gunzip chunk consumption thread\n");            
                return EXIT_FAILURE;
            }
            if (pthread_join(produce_gzip_chunk_thread, NULL) != 0) {
                fprintf(stderr, "Error: Could not join gzip chunk production thread\n");
                return EXIT_FAILURE;
            }
            if (pthread_join(consume_gzip_chunk_thread, NULL) != 0) {
                fprintf(stderr, "Error: Could not join gzip chunk consumption thread\n");
                return EXIT_FAILURE;
            }
            if (pthread_join(consume_gunzip_chunk_thread, NULL) != 0) {
                fprintf(stderr, "Error: Could not join gunzip chunk consumption thread\n");
                return EXIT_FAILURE;
            }
        }
    else
        {
            /* 
               handle text formats
            */
        }

    /* cleanup */
    delete_file_ptr(&thread_data->in_file_ptr);
    pthread_mutex_destroy(&(thread_data->in_lock));
    pthread_cond_destroy(&(thread_data->in_cond));
    pthread_cond_destroy(&(thread_data->out_cond));
    free(thread_data);

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> main()\n");
#endif
    return EXIT_SUCCESS;
}

void * consume_gunzip_chunk(void *t_data)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> consume_gunzip_chunk()\n");
#endif

    pthread_data_t *d = (pthread_data_t *)t_data;
    long n_out_bytes_read_from_gunzip;

    pthread_mutex_lock(&d->in_lock);
    while(kTrue) {
        while (d->n_in_bytes_written_to_gunzip == 0) {
            pthread_cond_wait(&d->out_cond, &d->in_lock);
        }
        if (d->n_in_bytes_written_to_gunzip) {
            sleep(1);
            n_out_bytes_read_from_gunzip = read(d->gunzip_ptr->out, d->in_line, LINE_LENGTH_VALUE);
#ifdef DEBUG
            fprintf(stderr, "Debug: ------------------------ read [%07ld] bytes out from the gunzip process\n", n_out_bytes_read_from_gunzip);
            fprintf(stderr, "Debug: ------------------------ gunzip output chunk:\n[%s]\n", d->in_line);
#endif
            memset(d->in_line, 0, strlen(d->in_line));
            if (n_out_bytes_read_from_gunzip > 0)
                d->n_out_bytes_read_from_gunzip = n_out_bytes_read_from_gunzip;
            d->n_in_bytes_written_to_gunzip = 0;
            pthread_cond_signal(&d->in_cond);
        }
        if (d->in_eof && (d->n_in_bytes_written_to_gunzip == 0))
            break;
    }
    pthread_mutex_unlock(&d->in_lock);

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> consume_gunzip_chunk()\n");
#endif
    return NULL;
}

void * consume_gzip_chunk(void *t_data)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> consume_gzip_chunk()\n");
#endif

    pthread_data_t *d = (pthread_data_t *)t_data;
    long n_in_bytes_written_to_gunzip;

    pthread_mutex_lock(&d->in_lock);
    while(kTrue) {
        while (d->n_in_bytes == 0 && !d->in_eof)
            pthread_cond_wait(&d->in_cond, &d->in_lock);
        if (d->n_in_bytes) {
#ifdef DEBUG
            fprintf(stderr, "Debug: ........ [%07zu] processing chunk\n", d->n_in_bytes);
#endif
            if (!d->gunzip_ptr) {
#ifdef DEBUG
                fprintf(stderr, "Debug: * setting up gunzip ptr\n");
#endif
                d->gunzip_ptr = malloc(sizeof(popen3_desc_t));
                if (!d->gunzip_ptr) {
                    fprintf(stderr, "Error: Could not create gunzip file handle struct\n");
                    exit(EXIT_FAILURE);
                }

                popen3("gunzip -c -", 
                       &(d->gunzip_ptr->in), 
                       &(d->gunzip_ptr->out), 
                       &(d->gunzip_ptr->err), 
                       kTrue, 
                       kTrue);
                memset(d->in_line, 0, LINE_LENGTH_VALUE);
            }
            n_in_bytes_written_to_gunzip = (long) write(d->gunzip_ptr->in, d->in_buf, d->n_in_bytes);
#ifdef DEBUG
            fprintf(stderr, "Debug: ................ wrote [%07ld] bytes into the gunzip process\n", n_in_bytes_written_to_gunzip);
#endif
            if (n_in_bytes_written_to_gunzip > 0)
                d->n_in_bytes_written_to_gunzip = n_in_bytes_written_to_gunzip;

            d->n_in_bytes = 0;
            /* pthread_cond_signal(&d->in_cond); */
            pthread_cond_signal(&d->out_cond);
        }
        if (d->in_eof) 
            break;
    } 
    pthread_mutex_unlock(&d->in_lock);

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> consume_gzip_chunk()\n");
#endif
    return NULL;
}

void * produce_gzip_chunk(void *t_data)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> produce_gzip_chunk()\n");
#endif

    pthread_data_t *d = (pthread_data_t *)t_data;
    unsigned char in_buf[BUF_LENGTH_VALUE];
    size_t n_in_bytes = 0;

    d->in_eof = kFalse;

    pthread_mutex_lock(&d->in_lock);
    while(kTrue) {
        n_in_bytes = fread(in_buf, sizeof(in_buf[0]), sizeof(in_buf), d->in_file_ptr);
        if (n_in_bytes > 0) {
            while (d->n_in_bytes != 0 || d->n_out_bytes_read_from_gunzip != 0)
                pthread_cond_wait(&d->in_cond, &d->in_lock);
            memcpy(d->in_buf, in_buf, n_in_bytes);
            d->n_in_bytes = n_in_bytes;
#ifdef DEBUG
            fprintf(stderr, "Debug: ######## [%07zu] produced chunk\n", d->n_in_bytes);
#endif
            pthread_cond_signal(&d->in_cond);
        }
        else if (feof(d->in_file_ptr) || ferror(d->in_file_ptr))
            break;
    } 
    d->in_eof = kTrue;
    pthread_mutex_unlock(&d->in_lock);
    pthread_cond_signal(&d->in_cond);

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> produce_gzip_chunk()\n");
#endif
    return NULL;
}

FILE * new_file_ptr(const char *in_fn)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> new_file_ptr()\n");
#endif

    FILE *file_ptr = NULL;
    boolean not_stdin = kTrue;

    not_stdin = strcmp(in_fn, "-");
    file_ptr = (not_stdin) ? fopen(in_fn, "r") : stdin;

    if (!file_ptr) {
        fprintf(stderr, "Error: Could not open input stream\n");
        exit(EXIT_FAILURE);
    }

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> new_file_ptr()\n");
#endif
    return file_ptr;
}

void delete_file_ptr(FILE **file_ptr)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> delete_file_ptr()\n");
#endif

    fclose(*file_ptr);
    *file_ptr = NULL;

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> delete_file_ptr()\n");
#endif
}

pid_t popen3(const char *command, int *in_desc, int *out_desc, int *err_desc, boolean nonblock_in, boolean nonblock_outerr)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> popen3()\n");
#endif

    int p_stdin[2], p_stdout[2], p_stderr[2];
    pid_t pid;

    if (pipe(p_stdin) != 0 || pipe(p_stdout) != 0 || pipe(p_stderr) != 0)
        return -1;

    if (nonblock_in) {
        fcntl(p_stdin[POPEN3_WRITE], F_SETFL, fcntl(p_stdin[POPEN3_WRITE], F_GETFL) | O_NONBLOCK);
    }

    if (nonblock_outerr) {
        fcntl(p_stdout[POPEN3_READ], F_SETFL, fcntl(p_stdout[POPEN3_READ], F_GETFL) | O_NONBLOCK);
        fcntl(p_stderr[POPEN3_READ], F_SETFL, fcntl(p_stderr[POPEN3_READ], F_GETFL) | O_NONBLOCK);
    }

    pid = fork();
    if (pid < 0)
        return pid; /* error */

    if (pid == 0) {
        close(p_stdin[POPEN3_WRITE]);
        close(p_stdout[POPEN3_READ]);
        close(p_stderr[POPEN3_READ]);
        dup2(p_stdin[POPEN3_READ], fileno(stdin));
        dup2(p_stdout[POPEN3_WRITE], fileno(stderr));
        dup2(p_stdout[POPEN3_WRITE], fileno(stdout));
        execl("/bin/sh", "sh", "-c", command, NULL);
        fprintf(stderr, "Error: Could not execl [%s]\n", command);
        exit(EXIT_FAILURE);
    }

    if (in_desc == NULL)
        close(p_stdin[POPEN3_WRITE]);
    else
        *in_desc = p_stdin[POPEN3_WRITE];

    if (out_desc == NULL)
        close(p_stdout[POPEN3_READ]);
    else
        *out_desc = p_stdout[POPEN3_READ];

    if (err_desc == NULL)
        close(p_stderr[POPEN3_READ]);
    else
        *err_desc = p_stderr[POPEN3_READ];

#ifdef DEBUG
    fprintf(stderr, "Debug: New *in_desc  = %d\n", *in_desc);
    fprintf(stderr, "Debug: New *out_desc = %d\n", *out_desc);
    fprintf(stderr, "Debug: New *err_desc = %d\n", *err_desc);
#endif

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> popen3()\n");
#endif
    return pid;
}

off_t fsize(const char *fn) 
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> fsize()\n");
#endif

    struct stat st; 

    if (stat(fn, &st) == 0)
        return st.st_size;

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> fsize()\n");
#endif
    return EXIT_FAILURE; 
}

void initialize_globals()
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> initialize_globals()\n");
#endif

    convert_globals.input_format = kUnknown;
    convert_globals.filenames = NULL;
    convert_globals.num_filenames = 0;

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> initialize_globals()\n");
#endif
}

void parse_command_line_options(int argc, char **argv)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> parse_command_line_options()\n");
#endif

    int client_long_index;
    int client_opt = getopt_long(argc, 
                                 argv, 
                                 convert_client_opt_string, 
                                 convert_client_long_options, 
                                 &client_long_index);
    char *in_format_str = NULL;

    opterr = 0; /* disable error reporting by GNU getopt */
    initialize_globals();

    while (client_opt != -1) 
        {
            switch (client_opt) 
                {
                case 'f':
                    in_format_str = optarg;
                    break;
                case 'h':
                    print_usage(stdout);
                    exit(EXIT_SUCCESS);
                case '?':
                    print_usage(stdout);
                    exit(EXIT_SUCCESS);
                default:
                    break;
                }
            client_opt = getopt_long(argc, 
                                     argv, 
                                     convert_client_opt_string, 
                                     convert_client_long_options, 
                                     &client_long_index);
        }

    convert_globals.filenames = argv + optind;
    convert_globals.num_filenames = argc - optind;    

    if (!in_format_str) {
        fprintf(stderr, "Error: Specified input format was omitted; please specify one of required input formats\n");
        print_usage(stderr);
        exit(EXIT_FAILURE);
    }
    else if (convert_globals.num_filenames != 1) {
        fprintf(stderr, "Error: Please specify an input file (either a regular file or '-' for stdin\n");
        print_usage(stderr);
        exit(EXIT_FAILURE);
    }

    /* map format string to setting */
    if (strcmp(in_format_str, "gzip") == 0)
        convert_globals.input_format = kGzip;
    else {
        fprintf(stderr, "Error: Specified input format is unknown; please specify one of required input formats\n");
        print_usage(stderr);
        exit(EXIT_FAILURE);
    }

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> parse_command_line_options()\n");
#endif
}

void print_usage(FILE *stream)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> print_usage()\n");
#endif

    fprintf(stream, 
            "%s\n" \
            "  version: %s\n" \
            "  author:  %s\n" \
            "%s\n", 
            name, 
            version,
            authors,
            usage);

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> print_usage()\n");
#endif
}

这是构建过程:

$ mkdir -p objects
$ cc -Wall -Wextra -pedantic -std=c99 -D__STDC_CONSTANT_MACROS -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE=1 -DDEBUG=1 -g -O0 -fno-inline -c convert.c -o objects/convert.o -iquote${PWD}                                                        
$ cc -Wall -Wextra -pedantic -std=c99 -D__STDC_CONSTANT_MACROS -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE=1 -DDEBUG=1 -g -O0 -fno-inline objects/convert.o -o convert -lpthread

我已经能够在 OS X 和 Linux 主机上使用相当现代的编译环境构建此测试代码。

预先感谢您提供任何有用的建议!


我首先要说的是,我觉得 pthreads 条件和互斥体在这里并不是真正必要的,非阻塞 I/O 也不是对您所描述的问题的最佳反应。

在我看来,您所描述的无条件和无互斥版本的问题是忘记的症状close()孜孜不倦地结束管道的末端,结果是为子进程提供管道的写入结束文件描述符的副本stdin活着泄漏(到那个孩子或其他人身上)。

然后,由于stdin的读端对应的写端仍然存在,系统不会给出EOF,而是无限期阻塞。

就你而言,你did防止管道端文件描述符泄漏到生成的子级(使用正确的close()调用子端fork()在你的popen3(),虽然你忘记了close()错误的末端管道在父端结束)。但是,您并没有阻止这种泄漏所有其他孩子!如果你打电话popen3()两次,可以防止将三个描述符的集合泄漏到子级中,但由于父级仍然拥有它们,因此当下一次调用时popen3()发生后,fork()现在有6要关闭的文件描述符(旧的三个集合和刚刚创建的新的三个集合)。

因此,在您的情况下,您应该在这些管道末端设置 close-on-exec 标志,因此:

fcntl(fdIn [PIPEWR], F_SETFD, fcntl(fdIn [PIPEWR], F_GETFD) | FD_CLOEXEC);
fcntl(fdOut[PIPERD], F_SETFD, fcntl(fdOut[PIPERD], F_GETFD) | FD_CLOEXEC);
fcntl(fdErr[PIPERD], F_SETFD, fcntl(fdErr[PIPERD], F_GETFD) | FD_CLOEXEC);

下面的代码生成 6 个线程和 3 个进程,并在内部压缩然后解压缩后将其未经修改的输入传递到输出。它有效地实现了gzip -c - | XOR 0x55 | XOR 0x55 | gunzip -c - | cat, where:

  1. 标准输入被馈送到gzip通过线程srcThrd.
  2. gzip的输出由线程读取a2xor0Thrd并喂给线程xor0Thrd.
  3. Thread xor0Thrd对其输入进行异或0x55在将其传递给线程之前xor1Thrd.
  4. Thread xor1Thrd对其输入进行异或0x55在将其传递给线程之前xor22BThrd.
  5. Thread xor22BThrd将其输入提供给流程gunzip.
  6. Process gunzip将其输出直接(不通过线程)提供给cat
  7. Process cat的输出由线程读取dstThrd并打印到标准输出。

压缩是通过进程间管道通信完成的,而异或是通过进程内管道通信完成的。不使用互斥体或条件变量。main()非常容易理解。这段代码应该很容易扩展到您的情况。

/* Includes */
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <stdio.h>
#include <fcntl.h>



/* Defines */
#define PIPERD 0
#define PIPEWR 1




/* Data structures */
typedef struct PIPESET{
    int Ain[2];
    int Aout[2];
    int Aerr[2];
    int xor0[2];
    int xor1[2];
    int xor2[2];
    int Bin[2];
    int BoutCin[2];
    int Berr[2];
    int Cout[2];
    int Cerr[2];
} PIPESET;




/* Function Implementations */

/**
 * Source thread main method.
 * 
 * Slurps from standard input and feeds process A.
 */

void* srcThrdMain(void* arg){
    PIPESET* pipeset = (PIPESET*)arg;

    char c;
    while(read(0, &c, 1) > 0){
        write(pipeset->Ain[PIPEWR], &c, 1);
    }

    close(pipeset->Ain[PIPEWR]);

    pthread_exit(NULL);
}

/**
 * A to XOR0 thread main method.
 * 
 * Manually pipes from standard output of process A to input of thread XOR0.
 */

void* a2xor0ThrdMain(void* arg){
    PIPESET* pipeset = (PIPESET*)arg;

    char    buf[65536];
    ssize_t bytesRead;

    while((bytesRead = read(pipeset->Aout[PIPERD], buf, 65536)) > 0){
        write(pipeset->xor0[PIPEWR], buf, bytesRead);
    }

    close(pipeset->xor0[PIPEWR]);

    pthread_exit(NULL);
}

/**
 * XOR0 thread main method.
 * 
 * XORs input with 0x55 and outputs to input of XOR1.
 */

void* xor0ThrdMain(void* arg){
    PIPESET* pipeset = (PIPESET*)arg;

    char c;
    while(read(pipeset->xor0[PIPERD], &c, 1) > 0){
        c ^= 0x55;
        write(pipeset->xor1[PIPEWR], &c, 1);
    }

    close(pipeset->xor1[PIPEWR]);

    pthread_exit(NULL);
}

/**
 * XOR1 thread main method.
 * 
 * XORs input with 0x55 and outputs to input of process B.
 */

void* xor1ThrdMain(void* arg){
    PIPESET* pipeset = (PIPESET*)arg;

    char c;
    while(read(pipeset->xor1[PIPERD], &c, 1) > 0){
        c ^= 0x55;
        write(pipeset->xor2[PIPEWR], &c, 1);
    }

    close(pipeset->xor2[PIPEWR]);

    pthread_exit(NULL);
}

/**
 * XOR2 to B thread main method.
 * 
 * Manually pipes from input (output of XOR1) to input of process B.
 */

void* xor22BThrdMain(void* arg){
    PIPESET* pipeset = (PIPESET*)arg;

    char    buf[65536];
    ssize_t bytesRead;

    while((bytesRead = read(pipeset->xor2[PIPERD], buf, 65536)) > 0){
        write(pipeset->Bin[PIPEWR], buf, bytesRead);
    }

    close(pipeset->Bin[PIPEWR]);

    pthread_exit(NULL);
}

/**
 * Destination thread main method.
 * 
 * Manually copies the standard output of process C to the standard output.
 */

void* dstThrdMain(void* arg){
    PIPESET* pipeset = (PIPESET*)arg;

    char c;
    while(read(pipeset->Cout[PIPERD], &c, 1) > 0){
        write(1, &c, 1);
    }

    pthread_exit(NULL);
}

/**
 * Set close on exec flag on given descriptor.
 */

void setCloExecFlag(int fd){
    fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | FD_CLOEXEC);
}

/**
 * Set close on exec flag on given descriptor.
 */

void unsetCloExecFlag(int fd){
    fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) & ~FD_CLOEXEC);
}

/**
 * Pipe4.
 * 
 * Create a pipe with some ends possibly marked close-on-exec.
 */

#define PIPE4_FLAG_NONE       (0U)
#define PIPE4_FLAG_RD_CLOEXEC (1U << 0)
#define PIPE4_FLAG_WR_CLOEXEC (1U << 1)

int pipe4(int fd[2], int flags){
    int ret = pipe(fd);

    if(flags&PIPE4_FLAG_RD_CLOEXEC){setCloExecFlag(fd[PIPERD]);}
    if(flags&PIPE4_FLAG_WR_CLOEXEC){setCloExecFlag(fd[PIPEWR]);}

    return ret;
}

/**
 * Pipe4 explicit derivatives.
 */

#define pipe4_cloexec(fd)  pipe4((fd), PIPE4_FLAG_RD_CLOEXEC|PIPE4_FLAG_WR_CLOEXEC)

/**
 * Popen4.
 * 
 * General-case for spawning a process and tethering it with cloexec pipes on stdin,
 * stdout and stderr.
 * 
 * @param [in]      cmd    The command to execute.
 * @param [in/out]  pin    The pointer to the cloexec pipe for stdin.
 * @param [in/out]  pout   The pointer to the cloexec pipe for stdout.
 * @param [in/out]  perr   The pointer to the cloexec pipe for stderr.
 * @param [in]      flags  A bitwise OR of flags to this function. Available
 *                         flags are:
 * 
 *     POPEN4_FLAG_NONE:
 *         Explicitly specify no flags.
 *     POPEN4_FLAG_NOCLOSE_PARENT_STDIN,
 *     POPEN4_FLAG_NOCLOSE_PARENT_STDOUT,
 *     POPEN4_FLAG_NOCLOSE_PARENT_STDERR:
 *         Don't close pin[PIPERD], pout[PIPEWR] and perr[PIPEWR] in the parent,
 *         respectively.
 *     POPEN4_FLAG_CLOSE_CHILD_STDIN,
 *     POPEN4_FLAG_CLOSE_CHILD_STDOUT,
 *     POPEN4_FLAG_CLOSE_CHILD_STDERR:
 *         Close the respective streams in the child. Ignores pin, pout and perr
 *         entirely. Overrides a NOCLOSE_PARENT flag for the same stream.
 */

#define POPEN4_FLAG_NONE                             (0U)
#define POPEN4_FLAG_NOCLOSE_PARENT_STDIN        (1U << 0)
#define POPEN4_FLAG_NOCLOSE_PARENT_STDOUT       (1U << 1)
#define POPEN4_FLAG_NOCLOSE_PARENT_STDERR       (1U << 2)
#define POPEN4_FLAG_CLOSE_CHILD_STDIN           (1U << 3)
#define POPEN4_FLAG_CLOSE_CHILD_STDOUT          (1U << 4)
#define POPEN4_FLAG_CLOSE_CHILD_STDERR          (1U << 5)

pid_t popen4(const char* cmd, int pin[2], int pout[2], int perr[2], int flags){
    /********************
     **  FORK PROCESS  **
     ********************/
    pid_t ret = fork();

    if(ret < 0){
        /**
         * Error in fork(), still in parent.
         */

        fprintf(stderr, "fork() failed!\n");
        return ret;
    }else if(ret == 0){
        /**
         * Child-side of fork
         */

        if(flags & POPEN4_FLAG_CLOSE_CHILD_STDIN){
            close(0);
        }else{
            unsetCloExecFlag(pin [PIPERD]);
            dup2(pin [PIPERD], 0);
        }
        if(flags & POPEN4_FLAG_CLOSE_CHILD_STDOUT){
            close(1);
        }else{
            unsetCloExecFlag(pout[PIPEWR]);
            dup2(pout[PIPEWR], 1);
        }
        if(flags & POPEN4_FLAG_CLOSE_CHILD_STDERR){
            close(2);
        }else{
            unsetCloExecFlag(perr[PIPEWR]);
            dup2(perr[PIPEWR], 2);
        }

        execl("/bin/sh", "sh", "-c", cmd, NULL);

        fprintf(stderr, "exec() failed!\n");
        exit(-1);
    }else{
        /**
         * Parent-side of fork
         */

        if(~flags & POPEN4_FLAG_NOCLOSE_PARENT_STDIN  &&
           ~flags & POPEN4_FLAG_CLOSE_CHILD_STDIN){
            close(pin [PIPERD]);
        }
        if(~flags & POPEN4_FLAG_NOCLOSE_PARENT_STDOUT &&
           ~flags & POPEN4_FLAG_CLOSE_CHILD_STDOUT){
            close(pout[PIPEWR]);
        }
        if(~flags & POPEN4_FLAG_NOCLOSE_PARENT_STDERR &&
           ~flags & POPEN4_FLAG_CLOSE_CHILD_STDERR){
            close(perr[PIPEWR]);
        }

        return ret;
    }

    /* Unreachable */
    return ret;
}

/**
 * Main Function.
 * 
 * Sets up the whole piping scheme.
 */

int main(int argc, char* argv[]){
    pthread_t srcThrd, a2xor0Thrd, xor0Thrd, xor1Thrd, xor22BThrd, dstThrd;
    pid_t     gzip, gunzip, cat;
    PIPESET   pipeset;

    pipe4_cloexec(pipeset.Ain);
    pipe4_cloexec(pipeset.Aout);
    pipe4_cloexec(pipeset.Aerr);
    pipe4_cloexec(pipeset.Bin);
    pipe4_cloexec(pipeset.BoutCin);
    pipe4_cloexec(pipeset.Berr);
    pipe4_cloexec(pipeset.Cout);
    pipe4_cloexec(pipeset.Cerr);
    pipe4_cloexec(pipeset.xor0);
    pipe4_cloexec(pipeset.xor1);
    pipe4_cloexec(pipeset.xor2);

    /* Spawn processes */
    gzip   = popen4("gzip -c -",   pipeset.Ain,     pipeset.Aout,    pipeset.Aerr, POPEN4_FLAG_NONE);
    gunzip = popen4("gunzip -c -", pipeset.Bin,     pipeset.BoutCin, pipeset.Berr, POPEN4_FLAG_NONE);
    cat    = popen4("cat",         pipeset.BoutCin, pipeset.Cout,    pipeset.Cerr, POPEN4_FLAG_NONE);


    /* Spawn threads */
    pthread_create(&srcThrd,    NULL, srcThrdMain,    &pipeset);
    pthread_create(&a2xor0Thrd, NULL, a2xor0ThrdMain, &pipeset);
    pthread_create(&xor0Thrd,   NULL, xor0ThrdMain,   &pipeset);
    pthread_create(&xor1Thrd,   NULL, xor1ThrdMain,   &pipeset);
    pthread_create(&xor22BThrd, NULL, xor22BThrdMain, &pipeset);
    pthread_create(&dstThrd,    NULL, dstThrdMain,    &pipeset);
    pthread_join(srcThrd,    (void**)NULL);
    pthread_join(a2xor0Thrd, (void**)NULL);
    pthread_join(xor0Thrd,   (void**)NULL);
    pthread_join(xor1Thrd,   (void**)NULL);
    pthread_join(xor22BThrd, (void**)NULL);
    pthread_join(dstThrd,    (void**)NULL);
    return 0;
}

对自己的代码进行注释

您的代码存在很多问题,其中大多数与线程无关。

  • 你不close()文件描述符d->gunzip_ptr->in。这意味着gunzip永远无法知道其上不再有任何输入stdin,所以它永远不会退出。
  • Since gunzip永远不会退出,永远不会close()它的标准输出,因此是一个阻塞read()另一端永远不会解锁。非阻塞读取将始终给出-1, with errno == EAGAIN.
  • Your popen3()close() p_stdin[POPEN3_READ], p_stdout[POPEN3_WRITE] or p_stderr[POPEN3_WRITE]在父端fork()。只有孩子才应该拥有这些描述符。未能关闭这些意味着当父进程本身尝试读取子进程的 stdout 和 stderr 时,它永远不会看到 EOF,同样出于与上面相同的原因:因为它本身仍然拥有一个可以在其中写入的写入结束管道,使新的数据出现在读端。
  • Your code implicitly relies on gunzip writing out at least one byte for every 1024 you write in. There is no guarantee that this will be the case, since gunzip may, at its leisure, buffer internally.
    • 这是因为您的代码最多读取然后复制块BUF_LENGTH_VALUE字节到d->in_buf。然后你分配你读过的字节数fread() to d->n_in_bytes。这都儿一样d->n_in_bytes用于你的write()打电话写信给gunzip的标准输入。然后你发出信号consume_gunzip_chunk()醒来,然后pthread_cond_wait()代表下一个 gzip 压缩块。但是这个 gzip 压缩的块可能永远不会出现,因为不能保证gunzip将能够从输入的前 1024 个字节中解压缩有用的输出,甚至不能保证它会write()输出而不是缓冲它,直到它有 4096 字节(一整页)的输出。因此,read()打电话进来consume_gunzip_chunk()可能永远不会成功(甚至返回,如果read()被阻塞)。而如果read()永远不会回来,那么consume_gunzip_chunk()没有信号d->in_cond,所以三个线程都被卡住了。并且即使read()是非阻塞的,gzip 的最后一个输出块可能永远不会出现,因为gzip的输入永远不会关闭,因此它不会通过以下方式刷新其缓冲区write()把它们拿出来,所以read()另一端永远不会获得有用的数据,并且没有任何恳求就无法引出它close().
  • 错误的可能(可能?)原因: d->n_out_bytes_read_from_gunzip,一旦它变成非0,永远不会成为0再次。这意味着极其令人费解的

    while (d->n_in_bytes != 0 || d->n_out_bytes_read_from_gunzip != 0)
        pthread_cond_wait(&d->in_cond, &d->in_lock);
    

    within produce_gzip_chunk()将会,一旦输入d->n_out_bytes_read_from_gunzip != 0,永远被卡住。通过致电sleep(1) within consume_gunzip_chunk(),其中设置d->n_out_bytes_read_from_gunzip,您可能已经通过阅读之前的所有输入解决了问题consume_gunzip_chunk()可以通过设置锁定系统d->n_out_bytes_read_from_gunzip为非零值。

  • two调用的线程pthread_cond_wait(&d->in_cond, &d->in_lock);,这些是produce_gzip_chunk() and consume_gzip_chunk()。绝对不能保证什么时候consume_gunzip_chunk() calls pthread_cond_signal(&d->in_cond);,“正确”的线程(无论是在您的设计中)将接收信号。为了确保他们所有人都会,使用pthread_cond_broadcast(),但随后你将自己暴露在雷群问题 http://en.wikipedia.org/wiki/Thundering_herd_problem。需要使用pthread_cond_broadcast()在我看来,这种情况又是糟糕设计的症状。
  • 相关,您拨打pthread_cond_signal(&d->in_cond)在一个线程内(实际上,一个function),你在其中调用pthread_cond_wait(&d->in_cond, &d->in_lock)。这样做有什么目的?
  • You use d->in_lock出于太多不同的目的,您可能会面临死锁的可能性,或者由于过度保护而导致性能低下。特别是您使用它作为对两者的保护d->in_cond and d->out_cond。这保护太强了——输出gunzip into d->in_line应该能够与输入同时发生gunzip被写入和写入d->in_buf.
  • Within consume_gunzip_chunk(), 你有

    while (d->n_in_bytes_written_to_gunzip == 0) {
        pthread_cond_wait(&d->out_cond, &d->in_lock);
    }
    if (d->n_in_bytes_written_to_gunzip) {
        ...
    

    This if永远不会失败!您可能想到了一个案例吗?

  • 考虑制作整个struct pthread_data易失性(或者至少是由多个线程使用的那些整数元素),因为编译器可能决定优化实际上应该保留的加载和存储。

赞美

为了不听起来太消极,我想说,一般来说,您的问题不是由于滥用 pthreads API 而是由于错误的消费者-生产者逻辑和缺乏close()s。此外,您似乎明白pthread_cond_wait()可能会醒来虚假地 http://en.wikipedia.org/wiki/Spurious_wakeup,因此您已将其包装在一个检查不变量的循环中。

将来:

我会使用管道,甚至在线程之间也是如此。这使您无需实施自己的消费者-生产者方案;内核已经为你解决了这个问题,并为你提供了pipe(), read() and write()原语,这是您利用这个现成解决方案所需的全部内容。它还使代码更干净并且没有互斥体和条件变量。人们必须勤奋地封闭两端,并且在管道周围必须极其小心fork()。规则很简单:

  • 如果管道的写端存在,则read()在开放的读端不会给出 EOF 但会阻塞或EAGAIN.
  • 如果管道的写端全部关闭,read()在开放读端将给出 EOF。
  • 如果管道的读取端全部关闭,write()到它的任何写端都会导致SIGPIPE.
  • fork()重复整个过程,包括all描述符(模可能是疯狂的东西pthread_atfork())!
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何通过pthreads管理两个或多个消费者? 的相关文章

随机推荐

  • 保存散点图动画

    我一直在尝试使用 matplotlib 保存动画散点图 并且我希望它不需要完全不同的代码来查看动画图形和保存副本 该图完美显示了保存完成后的所有数据点 这段代码是修改后的版本Giggi s https stackoverflow com a
  • 在 Javascript 中获取不带模 (%) 运算符的余数,占 -/+ 符号

    对于家庭作业 我需要返回 num1 除以 num2 后的余数 而不使用内置模 运算符 我可以使用以下代码让大多数测试通过 但我一直不知道如何解释给定数字的 符号 我需要保留 num1 上的任何一个符号 并且如果 num2 为负数 还返回一个
  • springdoc-openapi:如何添加POST请求的示例?

    Controller有以下方法 ApiResponses value ApiResponse responseCode 200 GetMapping value API URI PREFIX PRODUCTS URI produces Me
  • Linux:通过网络进行屏幕桌面视频捕获和 VNC 帧速率

    抱歉 文字墙很长 TL DR VNC 连接的帧速率是多少 以帧 秒为单位 或者更确切地说 由谁决定 客户端还是服务器 对于桌面屏幕捕获的任何其他建议 但 正确的时间编码 具有不抖动的帧速率 具有稳定的周期 并有可能将其作为未压缩 或无损 图
  • 从 PDF 等二进制文件中读取文本

    我在 C 中读取二进制文件时遇到问题 目前我的代码是这样的 FILE s fopen source rb fseek s 0 SEEK END size file size ftell s rewind s char sbuffer cha
  • 在 Codeigniter 中加载 javascript

    对于我们的 Code Igniter 应用程序 我们在结束 body 标记之前加载所有 javascript 所以在我们的控制器中我们有 this gt load gt view head this gt head this gt load
  • C#、DLL 导入 API 在 VS2012 .NET Framework 4.5 中无法正常工作

    我的 WinForms 项目有一个问题 该项目是在 VS2005 NET Framework 2 0 中创建的 我刚刚将其升级到 VS2012 NET Framework 4 5 在我的项目中 我使用了第三方DLLDllImport并使用它
  • 对 Windows 窗体控件进行线程安全调用

    MSDN 文章 如何 对 Windows 窗体控件进行线程安全调用 http msdn microsoft com en us library ms171728 aspx说我们应该使用异步委托来进行调用 但为什么异步委托可以使调用安全呢 W
  • 为什么 localhost:5000 在 Flask 中不起作用?

    我正在使用 Flask 应用程序工厂模式 并且有这个 run py 文件 from app import create app app create app if name main app run host localhost debug
  • iPhone:AVAudioPlayer 不支持的文件类型

    我的应用程序从我们的服务器下载 mp3 并将其播放给用户 该文件为 64 kbps 如果我理解正确的话 这完全在 iPhone 的可接受范围内 我在几十个网站上查找了如何执行此操作 他们都建议我这样做 NSData data NSData
  • 在 Dart 中查找和替换字符串

    我正在为这个应用程序使用 flutter 但我在应用程序的逻辑方面遇到了问题 任何帮助深表感谢 应用程序目标 通过以下方式将所有输入缩写解码 替换 为单词 用户通过文本框输入文本 应用程序查找任何缩写 几个 并仅用文本替换缩写 我能够使用一
  • 在 Objective-C (iPhone) 中从 Excel 文件读取数据 [关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 我在 google 中搜索过 但似乎没有找到从 Objective C 读取 Excel 文件的方法 我找到的唯一答案是首先转换为 CSV
  • Ifelse 只返回列表的第一个元素

    我有两个清单 list1 lt list x c 1 2 3 y c 4 5 6 list1 x 1 1 2 3 y 1 4 5 6 list2 lt list x c 1 2 3 y c 4 5 6 z c 7 8 9 list2 x 1
  • C++中字符串如何分配内存?

    我知道动态内存比设置固定大小的数组并使用其中的一部分具有优势 但在动态内存中 您必须输入要在数组中存储的数据量 使用字符串时 您可以输入任意数量的字母 您甚至可以使用字符串代替数字 然后使用函数来转换它们 这一事实让我认为字符数组的动态内存
  • string.Equals (c#) 的区域性参数何时真正产生影响的示例?

    我不完全理解 string Equals 的第二个参数 这是因为我找不到任何例子来说明它何时会真正产生影响 例如 无论第二个参数的值如何 除了 IgnoreCase 这里给出的示例都是相同的 http msdn microsoft com
  • 在 C 中打印指针

    我试图用指针来理解一些东西 所以我写了这段代码 include
  • 如何使用json传递opentracing数据

    我的 API 网关启动一个跟踪器和一个用于验证电子邮件的范围 然后它传递给user service用于验证 我想通过这个span详情至user service作为 json 对象并启动另一个span as a tracer start sp
  • 如何让 geom_vline 尊重facet_wrap?

    我四处搜寻 但无法找到答案 我想做一个加权 geom bar 图 上面覆盖有一条垂直线 显示每个方面的总体加权平均值 我无法让这件事发生 垂直线似乎是应用于所有方面的单一值 require ggplot2 require plyr data
  • main()函数在C++中调用自身,会发生什么? [复制]

    这个问题在这里已经有答案了 include
  • 如何通过pthreads管理两个或多个消费者?

    我有一个正在寻求解决的通用问题 即从标准输入或常规文件流发送到应用程序的二进制数据块 应用程序又将二进制数据转换为文本 使用线程 我想在将文本传输到下一个应用程序之前对其进行处理 该应用程序会进一步修改该文本 依此类推 作为一个简单的测试用