rdkafka线程过多_Kafka快速入门(十一)——RdKafka源码分析

2023-11-16

Kafka快速入门(十一)——RdKafka源码分析

一、RdKafka C源码分析

1、Kafka OP队列

RdKafka将与Kafka Broke的交互、内部实现的操作都封装成Operator结构,然后放入OP处理队列里统一处理。Kafka OP队列是线程间通信的管道。

RdKafka队列定义在rdkafka_queue.h文件中,队列相关操作封装在rdsysqueue.h文件中。

(1)Kafka OP队列

typedef struct rd_kafka_q_s rd_kafka_q_t;

struct rd_kafka_q_s

{

mtx_t rkq_lock;// 队列操作加锁

cnd_t rkq_cond; // 队列中放入新元素时, 用条件变量唤醒相应等待线程

struct rd_kafka_q_s *rkq_fwdq; // Forwarded/Routed queue

struct rd_kafka_op_tailq rkq_q; // 放入队列的操作所存储的队列

int rkq_qlen; /* Number of entries in queue */

int64_t rkq_qsize; /* Size of all entries in queue */

int rkq_refcnt; // 引用计数

int rkq_flags; // 当前队列的状态

rd_kafka_t *rkq_rk;// 队列关联的Kafka Handler对象

struct rd_kafka_q_io *rkq_qio; //队列中放入新元素时,向fd写入数据唤醒等待线程

rd_kafka_q_serve_cb_t *rkq_serve; // 队列中的操作被执行时所执行的回调函数

void *rkq_opaque;

const char *rkq_name; // queue name

};

// Kafka Operator队列,对外接口

typedef struct rd_kafka_queue_s rd_kafka_queue_t;

struct rd_kafka_queue_s

{

rd_kafka_q_t *rkqu_q;// Kafka OP 队列

rd_kafka_t *rkqu_rk;// 队列关联的Kafka Handler

int rkqu_is_owner;

};

rd_kafka_queue_t *rd_kafka_queue_new (rd_kafka_t *rk)

{

rd_kafka_q_t *rkq;

rd_kafka_queue_t *rkqu;

rkq = rd_kafka_q_new(rk);

rkqu = rd_kafka_queue_new0(rk, rkq);

rd_kafka_q_destroy(rkq);

return rkqu;

}

创建OP队列

rd_kafka_queue_t *rd_kafka_queue_get_main (rd_kafka_t *rk)

{

return rd_kafka_queue_new0(rk, rk->rk_rep);

}

获取RdKafka与应用程序交互使用的OP队列

rd_kafka_queue_t *rd_kafka_queue_get_consumer (rd_kafka_t *rk) {

if (!rk->rk_cgrp)

return NULL;

return rd_kafka_queue_new0(rk, rk->rk_cgrp->rkcg_q);

}

获取消费者的OP队列

rd_kafka_queue_t *rd_kafka_queue_get_partition (rd_kafka_t *rk,

const char *topic,

int32_t partition) {

shptr_rd_kafka_toppar_t *s_rktp;

rd_kafka_toppar_t *rktp;

rd_kafka_queue_t *result;

if (rk->rk_type == RD_KAFKA_PRODUCER)

return NULL;

s_rktp = rd_kafka_toppar_get2(rk, topic,

partition,

0, /* no ua_on_miss */

1 /* create_on_miss */);

if (!s_rktp)

return NULL;

rktp = rd_kafka_toppar_s2i(s_rktp);

result = rd_kafka_queue_new0(rk, rktp->rktp_fetchq);

rd_kafka_toppar_destroy(s_rktp);

return result;

}

获取Topic的分区的OP队列

rd_kafka_op_t *rd_kafka_q_pop_serve (rd_kafka_q_t *rkq, rd_ts_t timeout_us,

int32_t version,

rd_kafka_q_cb_type_t cb_type,

rd_kafka_q_serve_cb_t *callback,

void *opaque);

处理OP队列中的一个OP操作,按version过滤的可处理OP,没有则等待,如果超时,函数退出。

int rd_kafka_q_serve (rd_kafka_q_t *rkq, int timeout_ms, int max_cnt,

rd_kafka_q_cb_type_t cb_type,

rd_kafka_q_serve_cb_t *callback,

void *opaque);

批量处理OP队列的OP

int rd_kafka_q_serve_rkmessages (rd_kafka_q_t *rkq, int timeout_ms,

rd_kafka_message_t **rkmessages,

size_t rkmessages_size);

处理RD_KAFKA_OP_FETCH OP操作

int rd_kafka_q_purge0 (rd_kafka_q_t *rkq, int do_lock);

#define rd_kafka_q_purge(rkq) rd_kafka_q_purge0(rkq, 1/*lock*/)

清除OP队列中的所有OP操作

rd_kafka_queue_t *rd_kafka_queue_get_background (rd_kafka_t *rk);

获取Kafka Handle的后台OP队列

2、Kafka OP操作

RaKafka OP操作封装在rdkafka_op.h文件中。

typedef enum

{

RD_KAFKA_OP_NONE, // 未指定类型

RD_KAFKA_OP_FETCH, // Kafka thread -> Application

RD_KAFKA_OP_ERR, // Kafka thread -> Application

RD_KAFKA_OP_CONSUMER_ERR, // Kafka thread -> Application

RD_KAFKA_OP_DR, // Kafka thread->Application:Produce message delivery report

RD_KAFKA_OP_STATS, // Kafka thread -> Application

RD_KAFKA_OP_OFFSET_COMMIT, // any -> toppar's Broker thread

RD_KAFKA_OP_NODE_UPDATE, // any -> Broker thread: node update

RD_KAFKA_OP_XMIT_BUF, // transmit buffer: any -> broker thread

RD_KAFKA_OP_RECV_BUF, // received response buffer: broker thr -> any

RD_KAFKA_OP_XMIT_RETRY, // retry buffer xmit: any -> broker thread

RD_KAFKA_OP_FETCH_START, // Application -> toppar's handler thread

RD_KAFKA_OP_FETCH_STOP, // Application -> toppar's handler thread

RD_KAFKA_OP_SEEK, // Application -> toppar's handler thread

RD_KAFKA_OP_PAUSE, // Application -> toppar's handler thread

RD_KAFKA_OP_OFFSET_FETCH, // Broker->broker thread: fetch offsets for topic

RD_KAFKA_OP_PARTITION_JOIN, // cgrp op:add toppar to cgrp,broker op:add toppar to broker

RD_KAFKA_OP_PARTITION_LEAVE, // cgrp op:remove toppar from cgrp,broker op:remove toppar from rkb

RD_KAFKA_OP_REBALANCE, // broker thread -> app:group rebalance

RD_KAFKA_OP_TERMINATE, // For generic use

RD_KAFKA_OP_COORD_QUERY, // Query for coordinator

RD_KAFKA_OP_SUBSCRIBE, // New subscription

RD_KAFKA_OP_ASSIGN, // New assignment

RD_KAFKA_OP_GET_SUBSCRIPTION,// Get current subscription Reuses u.subscribe

RD_KAFKA_OP_GET_ASSIGNMENT, // Get current assignment Reuses u.assign

RD_KAFKA_OP_THROTTLE, // Throttle info

RD_KAFKA_OP_NAME, // Request name

RD_KAFKA_OP_OFFSET_RESET, // Offset reset

RD_KAFKA_OP_METADATA, // Metadata response

RD_KAFKA_OP_LOG, // Log

RD_KAFKA_OP_WAKEUP, // Wake-up signaling

RD_KAFKA_OP_CREATETOPICS, // Admin: CreateTopics: u.admin_request

RD_KAFKA_OP_DELETETOPICS, // Admin: DeleteTopics: u.admin_request

RD_KAFKA_OP_CREATEPARTITIONS,// Admin: CreatePartitions: u.admin_request

RD_KAFKA_OP_ALTERCONFIGS, // Admin: AlterConfigs: u.admin_request

RD_KAFKA_OP_DESCRIBECONFIGS, // Admin: DescribeConfigs: u.admin_request

RD_KAFKA_OP_ADMIN_RESULT, // Admin API .._result_t

RD_KAFKA_OP_PURGE, // Purge queues

RD_KAFKA_OP_CONNECT, // Connect (to broker)

RD_KAFKA_OP_OAUTHBEARER_REFRESH, // Refresh OAUTHBEARER token

RD_KAFKA_OP_MOCK, // Mock cluster command

RD_KAFKA_OP_BROKER_MONITOR, // Broker state change

RD_KAFKA_OP_TXN, // Transaction command

RD_KAFKA_OP__END // 操作结束符

} rd_kafka_op_type_t;

rd_kafka_op_type_t枚举类型定义了RaKafka 所有OP操作类型。

typedef enum

{

RD_KAFKA_PRIO_NORMAL = 0, // 正常优先级

RD_KAFKA_PRIO_MEDIUM, // 中级

RD_KAFKA_PRIO_HIGH, // 高级

RD_KAFKA_PRIO_FLASH // 最高优先级:立即

} rd_kafka_prio_t;

rd_kafka_prio_t枚举类型定义了Kafka OP操作的所有优先级。

typedef enum

{

RD_KAFKA_OP_RES_PASS, // Not handled, pass to caller

RD_KAFKA_OP_RES_HANDLED, // Op was handled (through callbacks)

RD_KAFKA_OP_RES_KEEP, // Op已经被回调函数处理,但禁止被op_handle()销毁

RD_KAFKA_OP_RES_YIELD // Callback called yield

} rd_kafka_op_res_t;

rd_kafka_op_res_t枚举类型定义了OP被处理后的返回结果类型,

如果返回RD_KAFKA_OP_RES_YIELD,handler处理函数需要确定是否需要将OP重新入队列还是将OP销毁。

typedef enum

{

RD_KAFKA_Q_CB_INVALID, // 非法,未使用

RD_KAFKA_Q_CB_CALLBACK, // 基于OP触发回调函数

RD_KAFKA_Q_CB_RETURN, // 返回OP而不是触发回调函数

RD_KAFKA_Q_CB_FORCE_RETURN, // 无论是否触发回调函数都返回OP

RD_KAFKA_Q_CB_EVENT // 返回Event OP而不是触发回调函数

} rd_kafka_q_cb_type_t;

rd_kafka_q_cb_type_t枚举类型定义了OP队列中OP操作执行回调函数的所有类型。

OP队列执行回调函数类型定义如下:

typedef rd_kafka_op_res_t

(rd_kafka_q_serve_cb_t) (rd_kafka_t *rk,

struct rd_kafka_q_s *rkq,

struct rd_kafka_op_s *rko,

rd_kafka_q_cb_type_t cb_type, void *opaque);

OP回调函数定义如下:

typedef rd_kafka_op_res_t (rd_kafka_op_cb_t) (rd_kafka_t *rk,

rd_kafka_q_t *rkq,

struct rd_kafka_op_s *rko);

OP执行结果数据结构定义如下:

typedef struct rd_kafka_replyq_s

{

rd_kafka_q_t *q;// OP执行结果存储队列

int32_t version;// 版本

} rd_kafka_replyq_t;

Kafka OP数据结构定义如下:

struct rd_kafka_op_s

{

TAILQ_ENTRY(rd_kafka_op_s) rko_link;// 增加TAILQ字段

rd_kafka_op_type_t rko_type; // OP类型

rd_kafka_event_type_t rko_evtype;// Event类型

int rko_flags; // OP标识

int32_t rko_version;// 版本

rd_kafka_resp_err_t rko_err;//

int32_t rko_len; //

rd_kafka_prio_t rko_prio; // OP优先级

shptr_rd_kafka_toppar_t *rko_rktp;// 关联TopicPartition

rd_kafka_replyq_t rko_replyq;//

rd_kafka_q_serve_cb_t *rko_serve;// OP队列回调函数

void *rko_serve_opaque;// OP队列回调函数参数

rd_kafka_t *rko_rk;// Kafka Handle

rd_kafka_op_cb_t *rko_op_cb; // OP回调函数

union

{

struct

{

rd_kafka_buf_t *rkbuf;

rd_kafka_msg_t rkm;

int evidx;

} fetch;

struct

{

rd_kafka_topic_partition_list_t *partitions;

int do_free; // free .partitions on destroy()

} offset_fetch;

struct

{

rd_kafka_topic_partition_list_t *partitions;

void (*cb) (rd_kafka_t *rk,

rd_kafka_resp_err_t err,

rd_kafka_topic_partition_list_t *offsets,

void *opaque);

void *opaque;

int silent_empty; // Fail silently if there are no offsets to commit.

rd_ts_t ts_timeout;

char *reason;

} offset_commit;

struct

{

rd_kafka_topic_partition_list_t *topics;

} subscribe;

struct

{

rd_kafka_topic_partition_list_t *partitions;

} assign;

struct

{

rd_kafka_topic_partition_list_t *partitions;

} rebalance;

struct

{

char *str;

} name;

struct

{

int64_t offset;

char *errstr;

rd_kafka_msg_t rkm;

int fatal;

} err;

struct

{

int throttle_time;

int32_t nodeid;

char *nodename;

} throttle;

struct

{

char *json;

size_t json_len;

} stats;

struct

{

rd_kafka_buf_t *rkbuf;

} xbuf;

// RD_KAFKA_OP_METADATA

struct

{

rd_kafka_metadata_t *md;

int force; // force request regardless of outstanding metadata requests.

} metadata;

struct

{

shptr_rd_kafka_itopic_t *s_rkt;

rd_kafka_msgq_t msgq;

rd_kafka_msgq_t msgq2;

int do_purge2;

} dr;

struct

{

int32_t nodeid;

char nodename[RD_KAFKA_NODENAME_SIZE];

} node;

struct

{

int64_t offset;

char *reason;

} offset_reset;

struct

{

int64_t offset;

struct rd_kafka_cgrp_s *rkcg;

} fetch_start; // reused for SEEK

struct

{

int pause;

int flag;

} pause;

struct

{

char fac[64];

int level;

char *str;

} log;

struct

{

rd_kafka_AdminOptions_t options;

rd_ts_t abs_timeout; // Absolute timeout

rd_kafka_timer_t tmr; // Timeout timer

struct rd_kafka_enq_once_s *eonce; // 只入队列OP一次,用于触发Broker状态变化的OP请求

rd_list_t args; // Type depends on request, e.g. rd_kafka_NewTopic_t for CreateTopics

rd_kafka_buf_t *reply_buf; // Protocol reply

struct rd_kafka_admin_worker_cbs *cbs;

// Worker state

enum

{

RD_KAFKA_ADMIN_STATE_INIT,

RD_KAFKA_ADMIN_STATE_WAIT_BROKER,

RD_KAFKA_ADMIN_STATE_WAIT_CONTROLLER,

RD_KAFKA_ADMIN_STATE_CONSTRUCT_REQUEST,

RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE,

} state;

int32_t broker_id; // Requested broker id to communicate with.

// Application's reply queue

rd_kafka_replyq_t replyq;

rd_kafka_event_type_t reply_event_type;

} admin_request;

struct

{

rd_kafka_op_type_t reqtype; // Request op type

char *errstr; // 错误信息

rd_list_t results; // Type depends on request type:

void *opaque; // Application's opaque as set by rd_kafka_AdminOptions_set_opaque

} admin_result;

struct

{

int flags; // purge_flags from rd_kafka_purge()

} purge;

// Mock cluster command

struct

{

enum

{

RD_KAFKA_MOCK_CMD_TOPIC_SET_ERROR,

RD_KAFKA_MOCK_CMD_TOPIC_CREATE,

RD_KAFKA_MOCK_CMD_PART_SET_LEADER,

RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER,

RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER_WMARKS,

RD_KAFKA_MOCK_CMD_BROKER_SET_UPDOWN,

RD_KAFKA_MOCK_CMD_BROKER_SET_RACK,

RD_KAFKA_MOCK_CMD_COORD_SET,

RD_KAFKA_MOCK_CMD_APIVERSION_SET,

} cmd;

rd_kafka_resp_err_t err; // Error for:TOPIC_SET_ERROR

char *name; // For:TOPIC_SET_ERROR,TOPIC_CREATE,PART_SET_FOLLOWER,PART_SET_FOLLOWER_WMARKS,BROKER_SET_RACK,COORD_SET (key_type)

char *str; // For:COORD_SET (key)

int32_t partition; // For:PART_SET_FOLLOWER,PART_SET_FOLLOWER_WMARKS,PART_SET_LEADER,APIVERSION_SET (ApiKey)

int32_t broker_id; // For:PART_SET_FOLLOWER,PART_SET_LEADER,BROKER_SET_UPDOWN,BROKER_SET_RACK,COORD_SET

int64_t lo; // Low offset, for:TOPIC_CREATE (part cnt),PART_SET_FOLLOWER_WMARKS,BROKER_SET_UPDOWN, APIVERSION_SET (minver);

int64_t hi; // High offset, for:TOPIC_CREATE (repl fact),PART_SET_FOLLOWER_WMARKS,APIVERSION_SET (maxver)

} mock;

struct

{

struct rd_kafka_broker_s *rkb; // 状态变化的Broker

void (*cb) (struct rd_kafka_broker_s *rkb);// 要在OP处理线程触发的回调函数

} broker_monitor;

struct

{

rd_kafka_error_t *error; // 错误对象

char *group_id; // 要提交位移的消费者组ID

int timeout_ms; /**< Operation timeout */

rd_ts_t abs_timeout; /**< Absolute time */

rd_kafka_topic_partition_list_t *offsets;// 要提交的位移

} txn;

} rko_u;

};

typedef struct rd_kafka_op_s rd_kafka_event_t;

const char *rd_kafka_op2str (rd_kafka_op_type_t type);

返回OP类型的相应字符串

void rd_kafka_op_destroy (rd_kafka_op_t *rko);

销毁OP对象

rd_kafka_op_t *rd_kafka_op_new0 (const char *source, rd_kafka_op_type_t type);

#define rd_kafka_op_new(type) rd_kafka_op_new0(NULL, type)

生成OP对象

rd_kafka_op_res_t rd_kafka_op_call (rd_kafka_t *rk,

rd_kafka_q_t *rkq, rd_kafka_op_t *rko);

调用OP的回调函数

rd_kafka_op_res_t rd_kafka_op_handle_std (rd_kafka_t *rk, rd_kafka_q_t *rkq,

rd_kafka_op_t *rko, int cb_type)

{

if (cb_type == RD_KAFKA_Q_CB_FORCE_RETURN)

return RD_KAFKA_OP_RES_PASS;

else if (unlikely(rd_kafka_op_is_ctrl_msg(rko)))

{

rd_kafka_op_offset_store(rk, rko);

return RD_KAFKA_OP_RES_HANDLED;

}

else if (cb_type != RD_KAFKA_Q_CB_EVENT &&

rko->rko_type & RD_KAFKA_OP_CB)

return rd_kafka_op_call(rk, rkq, rko);

else if (rko->rko_type == RD_KAFKA_OP_RECV_BUF)

rd_kafka_buf_handle_op(rko, rko->rko_err);

else if (cb_type != RD_KAFKA_Q_CB_RETURN &&

rko->rko_type & RD_KAFKA_OP_REPLY &&

rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)

return RD_KAFKA_OP_RES_HANDLED;

else

return RD_KAFKA_OP_RES_PASS;

return RD_KAFKA_OP_RES_HANDLED;

}

对OP进行标准化处理

rd_kafka_op_res_t

rd_kafka_op_handle (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko,

rd_kafka_q_cb_type_t cb_type, void *opaque,

rd_kafka_q_serve_cb_t *callback)

{

rd_kafka_op_res_t res;

if (rko->rko_serve)

{

callback = rko->rko_serve;

opaque = rko->rko_serve_opaque;

rko->rko_serve = NULL;

rko->rko_serve_opaque = NULL;

}

res = rd_kafka_op_handle_std(rk, rkq, rko, cb_type);

if (res == RD_KAFKA_OP_RES_KEEP)

{

return res;

}

if (res == RD_KAFKA_OP_RES_HANDLED)

{

rd_kafka_op_destroy(rko);

return res;

}

else if (unlikely(res == RD_KAFKA_OP_RES_YIELD))

return res;

if (callback)

res = callback(rk, rkq, rko, cb_type, opaque);

return res;

}

处理OP

3、Kafka Message

rd_kafka_message_t定义在rdkafka.h文件:

typedef struct rd_kafka_message_s

{

rd_kafka_resp_err_t err; // 非0表示错误消息

rd_kafka_topic_t *rkt; // 关联Topic

int32_t partition; // 分区

void *payload; // 消息数据

size_t len; // err为0表示消息数据长度,非0表示错误信息长度

void *key; // err为0表示消息key

size_t key_len; // err为0表示消息key的长度

int64_t offset; // 位移

void *_private; // 对Consumer,为RdKafka私有指针;对于Producer,为dr_msg_cb

} rd_kafka_message_t;

Kafka Producer生产的数据在application层调用接口后最终会将数据封装成rd_kafka_message_t结构,Consumer从Broker消费的数据回调给application层时也会封装成rd_kafka_message_t结构。

rd_kafka_msg_t和rd_kafka_msgq_t定义在rdkafka_msg.h文件:

typedef struct rd_kafka_msg_s

{

rd_kafka_message_t rkm_rkmessage; // Kafka 消息,必须时第一个字段

TAILQ_ENTRY(rd_kafka_msg_s) rkm_link;// 增加TAILQ字段

int rkm_flags; // 消息类型标识

rd_kafka_timestamp_type_t rkm_tstype; // 消息时间戳

int64_t rkm_timestamp;// V1消息格式的时间戳

rd_kafka_headers_t *rkm_headers;

rd_kafka_msg_status_t rkm_status; // 消息持久化状态

union

{

struct

{

rd_ts_t ts_timeout; // 消息超时

rd_ts_t ts_enq; // 入队列或生产消息时间戳

rd_ts_t ts_backoff;

uint64_t msgid; // 用于保序的消息ID,从1开始

uint64_t last_msgid; //

int retries; // 重试次数

} producer;

#define rkm_ts_timeout rkm_u.producer.ts_timeout

#define rkm_ts_enq rkm_u.producer.ts_enq

#define rkm_msgid rkm_u.producer.msgid

struct

{

rd_kafkap_bytes_t binhdrs;

} consumer;

} rkm_u;

} rd_kafka_msg_t;

TAILQ_HEAD(rd_kafka_msgs_head_s, rd_kafka_msg_s);

typedef struct rd_kafka_msgq_s {

struct rd_kafka_msgs_head_s rkmq_msgs; /* TAILQ_HEAD */

int32_t rkmq_msg_cnt;

int64_t rkmq_msg_bytes;

} rd_kafka_msgq_t;

Kafka Message队列

static rd_kafka_msg_t *rd_kafka_message2msg (rd_kafka_message_t *rkmessage) {

return (rd_kafka_msg_t *)rkmessage;

}

将rd_kafka_message_t类型消息转换为rd_kafka_msg_t类型消息

int rd_kafka_msg_new (rd_kafka_itopic_t *rkt, int32_t force_partition,

int msgflags,

char *payload, size_t len,

const void *keydata, size_t keylen,

void *msg_opaque);

创建一条新的Kafka消息并将其入对到相应分区的消息队列。

static void rd_kafka_msgq_concat (rd_kafka_msgq_t *dst,rd_kafka_msgq_t *src);

将src消息队列的所有消息合并到dst消息队列尾部,src会被清空。

static void rd_kafka_msgq_move (rd_kafka_msgq_t *dst,rd_kafka_msgq_t *src);

将src消息队列的所有元素移动到dst消息队列,src会被清空

static void rd_kafka_msgq_purge (rd_kafka_t *rk, rd_kafka_msgq_t *rkmq);

清空Kafka Handle的消息队列

static rd_kafka_msg_t *rd_kafka_msgq_deq (rd_kafka_msgq_t *rkmq,

rd_kafka_msg_t *rkm,

int do_count);

将rkm消息从消息队列rkmq中删除

static rd_kafka_msg_t *rd_kafka_msgq_pop (rd_kafka_msgq_t *rkmq);

将rkm消息从消息队列rkmq中删除

int rd_kafka_msgq_enq_sorted (const rd_kafka_itopic_t *rkt,

rd_kafka_msgq_t *rkmq,

rd_kafka_msg_t *rkm);

将rkm消息按照消息ID排序插入rnkmq消息队列

static void rd_kafka_msgq_insert (rd_kafka_msgq_t *rkmq,rd_kafka_msg_t *rkm);

将rkm消息插入消息队列rkmq头部

static int rd_kafka_msgq_enq (rd_kafka_msgq_t *rkmq,rd_kafka_msg_t *rkm);

将rkm消息追加到rkmq消息队列

int rd_kafka_msgq_age_scan (struct rd_kafka_toppar_s *rktp,

rd_kafka_msgq_t *rkmq,

rd_kafka_msgq_t *timedout,

rd_ts_t now,

rd_ts_t *abs_next_timeout);

扫描rkmq消息队列,将超时的消息增加到timeout消息队列,并从rkmq消息队列将其删除。

int rd_kafka_msg_partitioner (rd_kafka_itopic_t *rkt, rd_kafka_msg_t *rkm,

rd_dolock_t do_lock);

对写入rkt主题的rkm消息进行分区分配

rd_kafka_message_t *rd_kafka_message_get (struct rd_kafka_op_s *rko);

从OP操作提取消息

rd_kafka_message_t *rd_kafka_message_new (void);

创建空的Kafka消息

4、Kafka Topic

Kafka Topic相关封装位于rdkafka_topic.h文件中。

struct rd_kafka_itopic_s

{

TAILQ_ENTRY(rd_kafka_itopic_s) rkt_link;

rd_refcnt_t rkt_refcnt; // 引入计数

rwlock_t rkt_lock;

rd_kafkap_str_t *rkt_topic; // Topic名称

shptr_rd_kafka_toppar_t *rkt_ua; // 未分配分区

shptr_rd_kafka_toppar_t **rkt_p; // 拥有TopicPartition的链表

int32_t rkt_partition_cnt; // 分区计数

rd_list_t rkt_desp;

rd_ts_t rkt_ts_metadata; // 最近更新Meta的时间戳

mtx_t rkt_app_lock;

rd_kafka_topic_t *rkt_app_rkt; // Topic对应用层的指针

int rkt_app_refcnt;

enum

{

RD_KAFKA_TOPIC_S_UNKNOWN,

RD_KAFKA_TOPIC_S_EXISTS,

RD_KAFKA_TOPIC_S_NOTEXISTS,

} rkt_state; // Topic状态

int rkt_flags; //

rd_kafka_t *rkt_rk; // Kafka Handle

rd_avg_t rkt_avg_batchsize;

rd_avg_t rkt_avg_batchcnt;

shptr_rd_kafka_itopic_t *rkt_shptr_app;

rd_kafka_topic_conf_t rkt_conf; // Topic配置

};

shptr_rd_kafka_itopic_t *rd_kafka_topic_new0 (rd_kafka_t *rk, const char *topic,

rd_kafka_topic_conf_t *conf,

int *existing, int do_lock);

创建rd_kafka_itopic_s对象

void rd_kafka_local_topics_to_list (rd_kafka_t *rk, rd_list_t *topics);

获取当前rd_kafka_t对象持有的所有topic名字,保存在一个rd_list中

void rd_kafka_topic_scan_all (rd_kafka_t *rk, rd_ts_t now);

扫描Kafka Handle持有的所有topic的所有分区,筛选出未分配分区的超时消息、需要在Broker上创建的Topic、Meta数据太旧需要被更新的Topic、Leader未知的分区。

static int rd_kafka_topic_partition_cnt_update (rd_kafka_itopic_t *rkt,

int32_t partition_cnt);

更新topic的partition个数,如果分区数量有变化,返回1,否则返回0。

rd_kafka_topic_t *rd_kafka_topic_new (rd_kafka_t *rk, const char *topic,

rd_kafka_topic_conf_t *conf);

创建Topic对象

static void rd_kafka_topic_assign_uas (rd_kafka_itopic_t *rkt,

rd_kafka_resp_err_t err);

分配未分配分区上的消息到可用分区

int rd_kafka_topic_partition_available (const rd_kafka_topic_t *app_rkt,

int32_t partition);

查询Topic的分区是否可用,即分区是否未Leader

5、Kafka TopicPartition

rd_kafka_topic_partition_t定义在rdkafka.h文件中。

typedef struct rd_kafka_topic_partition_s

{

char *topic; // Topic名称

int32_t partition; // 分区

int64_t offset; // 位移

void *metadata; // 元数据

size_t metadata_size;

void *opaque;

rd_kafka_resp_err_t err;

void *_private;

} rd_kafka_topic_partition_t;

typedef struct rd_kafka_topic_partition_list_s {

int cnt; // 当前元数数量

int size; // 分配数组大小

rd_kafka_topic_partition_t *elems; // 数组

} rd_kafka_topic_partition_list_t;

struct rd_kafka_toppar_s

{

TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rklink;

TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rkblink;

CIRCLEQ_ENTRY(rd_kafka_toppar_s) rktp_activelink;

TAILQ_ENTRY(rd_kafka_toppar_s) rktp_rktlink;

TAILQ_ENTRY(rd_kafka_toppar_s) rktp_cgrplink;

TAILQ_ENTRY(rd_kafka_toppar_s) rktp_txnlink;

rd_kafka_itopic_t *rktp_rkt;

shptr_rd_kafka_itopic_t *rktp_s_rkt; // 指向Topic对象

int32_t rktp_partition; // 分区

int32_t rktp_leader_id; // 当前Leader ID

int32_t rktp_broker_id; // 当前Broker ID

rd_kafka_broker_t *rktp_leader; // 当前Leader Broker

rd_kafka_broker_t *rktp_broker; // 当前preferred Broker

rd_kafka_broker_t *rktp_next_broker; // 下一个preferred Broker

rd_refcnt_t rktp_refcnt; // 引用计数

mtx_t rktp_lock;

rd_kafka_q_t *rktp_msgq_wakeup_q; // 唤醒消息队列

rd_kafka_msgq_t rktp_msgq; //

rd_kafka_msgq_t rktp_xmit_msgq;

int rktp_fetch;

rd_kafka_q_t *rktp_fetchq; // 从Broker取消息的队列

rd_kafka_q_t *rktp_ops; // 主线程OP队列

rd_atomic32_t rktp_msgs_inflight;

uint64_t rktp_msgid; // 当前/最新消息ID

struct

{

rd_kafka_pid_t pid;

uint64_t acked_msgid;

uint64_t epoch_base_msgid;

int32_t next_ack_seq;

int32_t next_err_seq;

rd_bool_t wait_drain;

} rktp_eos;

rd_atomic32_t rktp_version; // 最新OP版本

int32_t rktp_op_version; // 从Broker收到的当前命令的OP版本

int32_t rktp_fetch_version; // 当前Fetch的OP版本

enum

{

RD_KAFKA_TOPPAR_FETCH_NONE = 0,

RD_KAFKA_TOPPAR_FETCH_STOPPING,

RD_KAFKA_TOPPAR_FETCH_STOPPED,

RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY,

RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT,

RD_KAFKA_TOPPAR_FETCH_ACTIVE,

} rktp_fetch_state;

int32_t rktp_fetch_msg_max_bytes;

rd_ts_t rktp_ts_fetch_backoff;

int64_t rktp_query_offset;

int64_t rktp_next_offset;

int64_t rktp_last_next_offset;

int64_t rktp_app_offset; //

int64_t rktp_stored_offset; // 最近存储的位移,可能没有提交

int64_t rktp_committing_offset; // 当前正在提交位移

int64_t rktp_committed_offset; // 最新提交位移

rd_ts_t rktp_ts_committed_offset; // 最新提交位移的时间戳

struct offset_stats rktp_offsets; //

struct offset_stats rktp_offsets_fin; //

int64_t rktp_ls_offset; // 当前最新稳定位移

int64_t rktp_hi_offset; // 当前高水位

int64_t rktp_lo_offset; // 当前低水位

rd_ts_t rktp_ts_offset_lag;

char *rktp_offset_path; // 位移文件路径

FILE *rktp_offset_fp; // 位移文件描述符

rd_kafka_cgrp_t *rktp_cgrp;

int rktp_assigned;

rd_kafka_replyq_t rktp_replyq; //

int rktp_flags; // 分区状态

shptr_rd_kafka_toppar_t *rktp_s_for_desp; // rkt_desp链表指针

shptr_rd_kafka_toppar_t *rktp_s_for_cgrp; // rkcg_toppars链表指针

shptr_rd_kafka_toppar_t *rktp_s_for_rkb; // rkb_toppars链表指针

rd_kafka_timer_t rktp_offset_query_tmr; // 位移查询定时器

rd_kafka_timer_t rktp_offset_commit_tmr; // 位移提交定时器

rd_kafka_timer_t rktp_offset_sync_tmr; // 位移文件同步定时器

rd_kafka_timer_t rktp_consumer_lag_tmr; // 消费者滞后监视定时器

rd_interval_t rktp_lease_intvl; // Preferred副本租约

rd_interval_t rktp_new_lease_intvl; // 创建新的Preferred副本租约的间隔

rd_interval_t rktp_new_lease_log_intvl; //

rd_interval_t rktp_metadata_intvl; // Preferred副本的Meta请求的最大频率

int rktp_wait_consumer_lag_resp;

struct rd_kafka_toppar_err rktp_last_err;

struct

{

rd_atomic64_t tx_msgs; // 生产者发送的消息数量

rd_atomic64_t tx_msg_bytes; // 生产者发送的字节数量

rd_atomic64_t rx_msgs; // 消费者接收的消息数量

rd_atomic64_t rx_msg_bytes; // 消费者消费字节数

rd_atomic64_t producer_enq_msgs; // 生产者入对列的消息数量

rd_atomic64_t rx_ver_drops; // 消费者丢弃过期消息数量

} rktp_c;

};

6、Kafka Transport

RdKafka与Broker网络通信不需要支持高并发,因此RdKafka选择了Poll网络IO模型,对transport数据传输层进行了封装。

RdKafka与Kafka Broker间采用TCP连接,因此需要根据Kafka Message协议进行拆包组包:前4个字节是payload长度;payload部分分为header和body两部分,接收数据时先收4字节,即payload长度,再根据payload长度收取payload内容。

rd_kafka_transport_s定义在rdkafka_transport_init.h文件:

struct rd_kafka_transport_s

{

rd_socket_t rktrans_s; // 与Broker通信的Socket fd

rd_kafka_broker_t *rktrans_rkb; // 所连接Broker

struct

{

void *state;

int complete;

struct msghdr msg;

struct iovec iov[2];

char *recv_buf;

int recv_of;

int recv_len;

} rktrans_sasl; // SASL权限验证

rd_kafka_buf_t *rktrans_recv_buf; // 接收数据Buffer

rd_pollfd_t rktrans_pfd[2]; // Poll IO模型的fd:TCP Socket,Wake up fd

int rktrans_pfd_cnt; //

size_t rktrans_rcvbuf_size; // Socket接收数据Buffer大小

size_t rktrans_sndbuf_size; // Socket发送数据Buffer大小

};

typedef struct rd_kafka_transport_s rd_kafka_transport_t;

rd_kafka_transport_t *rd_kafka_transport_connect (rd_kafka_broker_t *rkb,

const rd_sockaddr_inx_t *sinx,

char *errstr,

size_t errstr_size)

{

rd_kafka_transport_t *rktrans;

int s = -1;

int r;

rkb->rkb_addr_last = sinx;

s = rkb->rkb_rk->rk_conf.socket_cb(sinx->in.sin_family,

SOCK_STREAM, IPPROTO_TCP,

rkb->rkb_rk->rk_conf.opaque);

if (s == -1)

{

rd_snprintf(errstr, errstr_size, "Failed to create socket: %s",

rd_socket_strerror(rd_socket_errno));

return NULL;

}

rktrans = rd_kafka_transport_new(rkb, s, errstr, errstr_size);

if (!rktrans)

goto err;

rd_rkb_dbg(rkb, BROKER, "CONNECT", "Connecting to %s (%s) "

"with socket %i",

rd_sockaddr2str(sinx, RD_SOCKADDR2STR_F_FAMILY |

RD_SOCKADDR2STR_F_PORT),

rd_kafka_secproto_names[rkb->rkb_proto], s);

/* Connect to broker */

if (rkb->rkb_rk->rk_conf.connect_cb)

{

rd_kafka_broker_lock(rkb); /* for rkb_nodename */

r = rkb->rkb_rk->rk_conf.connect_cb(

s, (struct sockaddr *)sinx, RD_SOCKADDR_INX_LEN(sinx),

rkb->rkb_nodename, rkb->rkb_rk->rk_conf.opaque);

rd_kafka_broker_unlock(rkb);

}

else

{

if (connect(s, (struct sockaddr *)sinx,

RD_SOCKADDR_INX_LEN(sinx)) == RD_SOCKET_ERROR &&

(rd_socket_errno != EINPROGRESS

))

r = rd_socket_errno;

else

r = 0;

}

if (r != 0)

{

rd_rkb_dbg(rkb, BROKER, "CONNECT",

"couldn't connect to %s: %s (%i)",

rd_sockaddr2str(sinx,

RD_SOCKADDR2STR_F_PORT |

RD_SOCKADDR2STR_F_FAMILY),

rd_socket_strerror(r), r);

rd_snprintf(errstr, errstr_size,

"Failed to connect to broker at %s: %s",

rd_sockaddr2str(sinx, RD_SOCKADDR2STR_F_NICE),

rd_socket_strerror(r));

goto err;

}

/* Set up transport handle */

rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt++].fd = s;

if (rkb->rkb_wakeup_fd[0] != -1)

{

rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt].events = POLLIN;

rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt++].fd = rkb->rkb_wakeup_fd[0];

}

/* Poll writability to trigger on connection success/failure. */

rd_kafka_transport_poll_set(rktrans, POLLOUT);

return rktrans;

err:

if (s != -1)

rd_kafka_transport_close0(rkb->rkb_rk, s);

if (rktrans)

rd_kafka_transport_close(rktrans);

return NULL;

}

建立与Broker建立的TCP连接,初始化rd_kafka_transport_s对象并返回

int rd_kafka_transport_io_serve (rd_kafka_transport_t *rktrans, int timeout_ms);

Poll并处理IO操作

void rd_kafka_transport_io_event (rd_kafka_transport_t *rktrans, int events);

处理IO操作

ssize_t rd_kafka_transport_socket_sendmsg (rd_kafka_transport_t *rktrans,

rd_slice_t *slice,

char *errstr, size_t errstr_size);

系统调用sendmsg方法的封装

ssize_t rd_kafka_transport_send (rd_kafka_transport_t *rktrans,

rd_slice_t *slice,

char *errstr, size_t errstr_size);

系统调用send方法的封装

ssize_t rd_kafka_transport_recv (rd_kafka_transport_t *rktrans,

rd_buf_t *rbuf,

char *errstr, size_t errstr_size);

系统调用recv方法的封装

rd_kafka_transport_t *rd_kafka_transport_new (rd_kafka_broker_t *rkb,

rd_socket_t s,

char *errstr,

size_t errstr_size);

使用已有Socket创建rd_kafka_transport_t对象

int rd_kafka_transport_poll(rd_kafka_transport_t *rktrans, int tmout);

Poll方法封装

ssize_t rd_kafka_transport_socket_recv (rd_kafka_transport_t *rktrans,

rd_buf_t *buf,

char *errstr, size_t errstr_size) {

#ifndef _MSC_VER

// Windows系统调用封装

return rd_kafka_transport_socket_recvmsg(rktrans, buf,

errstr, errstr_size);

#endif

// Linux系统调用封装

return rd_kafka_transport_socket_recv0(rktrans, buf,

errstr, errstr_size);

}

7、Kafka Meta

Kafka集群的Meta Data包括:所有Broker的信息:IP和Port;

所有Topic的信息:Topic名称,Partition数量,每个Partition的Leader,ISR,Replica集合等。

Kafka集群的每一台Broker都会缓存整个集群的Meta Data,当Broker或某一个Topic的Meta Data信息发生变化时, Kafka集群的Controller都会感知到作相应的状态转换,同时把发生变化的新Meta Data信息广播到所有的Broker。

RdKafka对Meta Data的封装和操作包括Meta Data获取、定时刷新以及引用的操作,如Partition Leader迁移,Partition个数的变化,Broker上下线等等。

Meta Data分为Broker、Topic、Partition三种,定义在rdkafka.h中。

typedef struct rd_kafka_metadata_broker

{

int32_t id; // Broker ID

char *host; // Broker主机名称

int port; // Broker监听端口

} rd_kafka_metadata_broker_t;

typedef struct rd_kafka_metadata_partition

{

int32_t id; // Partition ID

rd_kafka_resp_err_t err; // Broker报告的分区错误

int32_t leader; // 分区Leader Broker

int replica_cnt; // 副本中的Broker数量

int32_t *replicas; // 副本Broker列表

int isr_cnt; // ISR列表中的ISR Broker数量

int32_t *isrs; // ISR Broker列表

} rd_kafka_metadata_partition_t;

/**

* @brief Topic information

*/

typedef struct rd_kafka_metadata_topic

{

char *topic; // Topic名称

int partition_cnt; // 分区数量

struct rd_kafka_metadata_partition *partitions; //

rd_kafka_resp_err_t err; // Broker报告的Topic错误

} rd_kafka_metadata_topic_t;

typedef struct rd_kafka_metadata

{

int broker_cnt; // Broker数量

struct rd_kafka_metadata_broker *brokers; // Broker Meta

int topic_cnt; // Topic数量

struct rd_kafka_metadata_topic *topics; // Topic Meta

int32_t orig_broker_id; // Broker ID

char *orig_broker_name; // Broker名称

} rd_kafka_metadata_t;

rd_kafka_resp_err_t

rd_kafka_metadata (rd_kafka_t *rk, int all_topics,

rd_kafka_topic_t *only_rkt,

const struct rd_kafka_metadata **metadatap,

int timeout_ms);

请求Meta Data数据,阻塞操作

struct rd_kafka_metadata *

rd_kafka_metadata_copy (const struct rd_kafka_metadata *md, size_t size);

深度拷贝Meta Data

rd_kafka_resp_err_t rd_kafka_parse_Metadata (rd_kafka_broker_t *rkb,

rd_kafka_buf_t *request,

rd_kafka_buf_t *rkbuf,

struct rd_kafka_metadata **mdp);

处理Meta Data请求响应

size_t

rd_kafka_metadata_topic_match (rd_kafka_t *rk, rd_list_t *tinfos,

const rd_kafka_topic_partition_list_t *match);

从当前缓存的Meta Data中查找与match匹配的Topic,并将其加入tinfos

size_t

rd_kafka_metadata_topic_filter (rd_kafka_t *rk, rd_list_t *tinfos,

const rd_kafka_topic_partition_list_t *match);

增加缓存Meta Data中与match匹配的所有Topic到tinfos

rd_kafka_resp_err_t

rd_kafka_metadata_refresh_topics (rd_kafka_t *rk, rd_kafka_broker_t *rkb,

const rd_list_t *topics, int force,

const char *reason);

刷新指定topics的所有Meta Data

rd_kafka_resp_err_t

rd_kafka_metadata_refresh_known_topics (rd_kafka_t *rk, rd_kafka_broker_t *rkb,

int force, const char *reason);

刷新已知Topic的Meta Data

rd_kafka_resp_err_t

rd_kafka_metadata_refresh_brokers (rd_kafka_t *rk, rd_kafka_broker_t *rkb,

const char *reason);

根据Broker刷新Meta Data

rd_kafka_resp_err_t

rd_kafka_metadata_refresh_all (rd_kafka_t *rk, rd_kafka_broker_t *rkb,

const char *reason);

刷新集群中所有Topic的Meta Data

rd_kafka_resp_err_t

rd_kafka_metadata_request (rd_kafka_t *rk, rd_kafka_broker_t *rkb,

const rd_list_t *topics,

const char *reason, rd_kafka_op_t *rko);

Meta Data请求

void rd_kafka_metadata_fast_leader_query (rd_kafka_t *rk);

快速刷新分区Leader的Meta Data

8、Kafka Handle对象创建

Kafka生产者、消费者客户端对象通过rd_kafka_new函数进行创建,rd_kafka_new源码如下:

rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *app_conf,

char *errstr, size_t errstr_size){

...

// 创建conf或指定conf

if (!app_conf)

conf = rd_kafka_conf_new();

else

conf = app_conf;

...

/* Call on_new() interceptors */

rd_kafka_interceptors_on_new(rk, &rk->rk_conf);

...

// 创建队列

rk->rk_rep = rd_kafka_q_new(rk);

rk->rk_ops = rd_kafka_q_new(rk);

rk->rk_ops->rkq_serve = rd_kafka_poll_cb;

rk->rk_ops->rkq_opaque = rk;

...

if (rk->rk_conf.dr_cb || rk->rk_conf.dr_msg_cb)

rk->rk_drmode = RD_KAFKA_DR_MODE_CB;

else if (rk->rk_conf.enabled_events & RD_KAFKA_EVENT_DR)

rk->rk_drmode = RD_KAFKA_DR_MODE_EVENT;

else

rk->rk_drmode = RD_KAFKA_DR_MODE_NONE;

if (rk->rk_drmode != RD_KAFKA_DR_MODE_NONE)

rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_DR;

if (rk->rk_conf.rebalance_cb)

rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_REBALANCE;

if (rk->rk_conf.offset_commit_cb)

rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_OFFSET_COMMIT;

if (rk->rk_conf.error_cb)

rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_ERROR;

rk->rk_controllerid = -1;

...

if (type == RD_KAFKA_CONSUMER &&

RD_KAFKAP_STR_LEN(rk->rk_group_id) > 0)

rk->rk_cgrp = rd_kafka_cgrp_new(rk,

rk->rk_group_id,

rk->rk_client_id);

...

// 后台线程和后台事件队列创建

if (rk->rk_conf.background_event_cb)

{

/* Hold off background thread until thrd_create() is done. */

rd_kafka_wrlock(rk);

rk->rk_background.q = rd_kafka_q_new(rk);

rk->rk_init_wait_cnt++;

if ((thrd_create(&rk->rk_background.thread,

rd_kafka_background_thread_main, rk)) != thrd_success)

...

}

/* Create handler thread */

rk->rk_init_wait_cnt++;

if ((thrd_create(&rk->rk_thread, rd_kafka_thread_main, rk)) != thrd_success)

{

...

}

// 启动Logic Broker线程

rk->rk_internal_rkb = rd_kafka_broker_add(rk, RD_KAFKA_INTERNAL,

RD_KAFKA_PROTO_PLAINTEXT,

"", 0, RD_KAFKA_NODEID_UA);

// 根据配置增加Broker

if (rk->rk_conf.brokerlist)

{

if (rd_kafka_brokers_add0(rk, rk->rk_conf.brokerlist) == 0)

rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN,

"No brokers configured");

}

...

}

rd_kafka_new主要工作如下;

(1)根据配置设置属性;

(2)创建Kafka Handle对象的OP队列;

(3)创建后台线程和后台事件队列;

(4)创建RdKafka主线程,执行rd_kafka_thread_main函数,主线程名称为rdk:main;

(5)创建Broker内部线程;

(6)根据配置创建Broker线程(每个Broker一个线程)。

int rd_kafka_brokers_add (rd_kafka_t *rk, const char *brokerlist)

{

return rd_kafka_brokers_add0(rk, brokerlist);

}

int rd_kafka_brokers_add0 (rd_kafka_t *rk, const char *brokerlist)

{

...

if ((rkb = rd_kafka_broker_find(rk, proto, host, port)) &&

rkb->rkb_source == RD_KAFKA_CONFIGURED)

{

cnt++;

}

else if (rd_kafka_broker_add(rk, RD_KAFKA_CONFIGURED,

proto, host, port,

RD_KAFKA_NODEID_UA) != NULL)

cnt++;

...

}

rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk,

rd_kafka_confsource_t source,

rd_kafka_secproto_t proto,

const char *name, uint16_t port,

int32_t nodeid)

{

...

thrd_create(&rkb->rkb_thread, rd_kafka_broker_thread_main, rkb);

...

}

static int rd_kafka_broker_thread_main (void *arg)

{

rd_kafka_set_thread_name("%s", rkb->rkb_name);

rd_kafka_set_thread_sysname("rdk:broker%"PRId32, rkb->rkb_nodeid);

...

rd_kafka_broker_serve(rkb, rd_kafka_max_block_ms);

...

rd_kafka_broker_ops_serve(rkb, RD_POLL_NOWAIT);

...

}

9、Producer生产消息过程

(1)rd_kafka_produce

rd_kafka_produce函数位于rdkafka_msg.c文件:

int rd_kafka_produce (rd_kafka_topic_t *rkt, int32_t partition,

int msgflags,

void *payload, size_t len,

const void *key, size_t keylen,

void *msg_opaque) {

return rd_kafka_msg_new(rd_kafka_topic_a2i(rkt), partition,

msgflags, payload, len,

key, keylen, msg_opaque);

}

(2)rd_kafka_msg_new

rd_kafka_msg_new函数位于rdkafka_msg.c文件:

int rd_kafka_msg_new (rd_kafka_itopic_t *rkt, int32_t force_partition,

int msgflags,

char *payload, size_t len,

const void *key, size_t keylen,

void *msg_opaque)

{

...

// 创建rd_kafka_msg_t消息

rkm = rd_kafka_msg_new0(rkt, force_partition, msgflags,

payload, len, key, keylen, msg_opaque,

&err, &errnox, NULL, 0, rd_clock());

...

// 对消息进行分区分配

err = rd_kafka_msg_partitioner(rkt, rkm, 1);

...

}

rd_kafka_msg_new内部通过rd_kafka_msg_new0创建Kafka消息,使用rd_kafka_msg_partitioner对Kafka消息进行分区分配。

(3)rd_kafka_msg_partitioner

rd_kafka_msg_partitioner函数位于rdkafka_msg.c文件:

int rd_kafka_msg_partitioner (rd_kafka_itopic_t *rkt, rd_kafka_msg_t *rkm,

rd_dolock_t do_lock)

{

// 获取分区号

...

// 获取分区

s_rktp_new = rd_kafka_toppar_get(rkt, partition, 0);

...

rktp_new = rd_kafka_toppar_s2i(s_rktp_new);

rd_atomic64_add(&rktp_new->rktp_c.producer_enq_msgs, 1);

/* Update message partition */

if (rkm->rkm_partition == RD_KAFKA_PARTITION_UA)

rkm->rkm_partition = partition;

// 将消息入队分区队列

rd_kafka_toppar_enq_msg(rktp_new, rkm);

...

}

rd_kafka_msg_partitioner内部通过通过rd_kafka_toppar_enq_msg将分区加入分区队列。

(4)rd_kafka_toppar_enq_msg

void rd_kafka_toppar_enq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm)

{

...

// 入队列

if (rktp->rktp_partition == RD_KAFKA_PARTITION_UA ||

rktp->rktp_rkt->rkt_conf.queuing_strategy == RD_KAFKA_QUEUE_FIFO)

{

queue_len = rd_kafka_msgq_enq(&rktp->rktp_msgq, rkm);

}

else

{

queue_len = rd_kafka_msgq_enq_sorted(rktp->rktp_rkt, &rktp->rktp_msgq, rkm);

}

...

}

(5)rd_kafka_msgq_enq

static RD_INLINE RD_UNUSED int rd_kafka_msgq_enq (rd_kafka_msgq_t *rkmq,

rd_kafka_msg_t *rkm)

{

TAILQ_INSERT_TAIL(&rkmq->rkmq_msgs, rkm, rkm_link);

rkmq->rkmq_msg_bytes += rkm->rkm_len + rkm->rkm_key_len;

return (int)++rkmq->rkmq_msg_cnt;

}

(6)rd_kafka_msgq_enq_sorted

rd_kafka_msgq_enq_sorted函数位于rdkafka_msg.c文件:

int rd_kafka_msgq_enq_sorted (const rd_kafka_itopic_t *rkt,

rd_kafka_msgq_t *rkmq,

rd_kafka_msg_t *rkm)

{

rd_dassert(rkm->rkm_u.producer.msgid != 0);

return rd_kafka_msgq_enq_sorted0(rkmq, rkm,

rkt->rkt_conf.msg_order_cmp);

}

int rd_kafka_msgq_enq_sorted0 (rd_kafka_msgq_t *rkmq,

rd_kafka_msg_t *rkm,

int (*order_cmp) (const void *, const void *))

{

TAILQ_INSERT_SORTED(&rkmq->rkmq_msgs, rkm, rd_kafka_msg_t *,

rkm_link, order_cmp);

rkmq->rkmq_msg_bytes += rkm->rkm_len + rkm->rkm_key_len;

return ++rkmq->rkmq_msg_cnt;

}

队列的操作位于rdsysqueue.h文件中。

rd_kafka_broker_add函数位于rdkafka_broker.c文件:

rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk,

rd_kafka_confsource_t source,

rd_kafka_secproto_t proto,

const char *name, uint16_t port,

int32_t nodeid)

{

rd_kafka_broker_t *rkb;

rkb = rd_calloc(1, sizeof(*rkb));

// 设置rd_kafka_broker_t对象属性

...

if (thrd_create(&rkb->rkb_thread, rd_kafka_broker_thread_main, rkb) != thrd_success)

{

...

}

}

rd_kafka_broker_add创建Broker线程,启动执行rd_kafka_broker_thread_main函数。

static int rd_kafka_broker_thread_main (void *arg)

{

...

rd_kafka_set_thread_name("%s", rkb->rkb_name);

rd_kafka_set_thread_sysname("rdk:broker%"PRId32, rkb->rkb_nodeid);

...

rd_kafka_broker_serve(rkb, ...);

...

}

static void rd_kafka_broker_serve (rd_kafka_broker_t *rkb, int timeout_ms) {

...

if (rkb->rkb_source == RD_KAFKA_INTERNAL)

rd_kafka_broker_internal_serve(rkb, abs_timeout);

else if (rkb->rkb_rk->rk_type == RD_KAFKA_PRODUCER)

rd_kafka_broker_producer_serve(rkb, abs_timeout);

else if (rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER)

rd_kafka_broker_consumer_serve(rkb, abs_timeout);

}

static void rd_kafka_broker_producer_serve (rd_kafka_broker_t *rkb,

rd_ts_t abs_timeout)

{

//

rd_kafka_broker_produce_toppars(rkb, now, &next_wakeup,

do_timeout_scan);

rd_kafka_broker_ops_io_serve(rkb, next_wakeup);

}

static void rd_kafka_broker_ops_io_serve (rd_kafka_broker_t *rkb,

rd_ts_t abs_timeout)

{

...

rd_kafka_broker_ops_serve(rkb, rd_timeout_remains_us(abs_timeout));

...

}

static int rd_kafka_broker_ops_serve (rd_kafka_broker_t *rkb,

rd_ts_t timeout_us)

{

rd_kafka_op_t *rko;

int cnt = 0;

while ((rko = rd_kafka_q_pop(rkb->rkb_ops, timeout_us, 0)) &&

(cnt++, rd_kafka_broker_op_serve(rkb, rko)))

timeout_us = RD_POLL_NOWAIT;

return cnt;

}

rdkafka_broker.c文件:

static ssize_t rd_kafka_broker_send (rd_kafka_broker_t *rkb, rd_slice_t *slice)

{

...

r = rd_kafka_transport_send(rkb->rkb_transport, slice,

errstr, sizeof(errstr));

...

}

rdkafka_transport.c文件:

ssize_t rd_kafka_transport_send (rd_kafka_transport_t *rktrans,

rd_slice_t *slice, char *errstr, size_t errstr_size)

{

..

r = rd_kafka_transport_socket_send(rktrans, slice,

errstr, errstr_size);

...

}

static ssize_t rd_kafka_transport_socket_send (rd_kafka_transport_t *rktrans,

rd_slice_t *slice,

char *errstr, size_t errstr_size) {

#ifndef _MSC_VER

/* FIXME: Use sendmsg() with iovecs if there's more than one segment

* remaining, otherwise (or if platform does not have sendmsg)

* use plain send(). */

return rd_kafka_transport_socket_sendmsg(rktrans, slice,

errstr, errstr_size);

#endif

return rd_kafka_transport_socket_send0(rktrans, slice,

errstr, errstr_size);

}

static ssize_t rd_kafka_transport_socket_sendmsg (rd_kafka_transport_t *rktrans,

rd_slice_t *slice,

char *errstr, size_t errstr_size)

{

...

r = sendmsg(rktrans->rktrans_s, &msg, MSG_DONTWAIT

...

}

10、Consumer消费消息过程

(1)开启消息消费

RdKafka提供了rd_kafka_consume_start、rd_kafka_consume、rd_kafka_consume_start_queue、rd_kafka_consume_queue接口用于消息消费。

int rd_kafka_consume_start0 (rd_kafka_itopic_t *rkt, int32_t partition,

int64_t offset, rd_kafka_q_t *rkq) {

shptr_rd_kafka_toppar_t *s_rktp;

if (partition < 0) {

rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,

ESRCH);

return -1;

}

if (!rd_kafka_simple_consumer_add(rkt->rkt_rk)) {

rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);

return -1;

}

rd_kafka_topic_wrlock(rkt);

s_rktp = rd_kafka_toppar_desired_add(rkt, partition);

rd_kafka_topic_wrunlock(rkt);

/* Verify offset */

if (offset == RD_KAFKA_OFFSET_BEGINNING ||

offset == RD_KAFKA_OFFSET_END ||

offset <= RD_KAFKA_OFFSET_TAIL_BASE) {

/* logical offsets */

} else if (offset == RD_KAFKA_OFFSET_STORED) {

/* offset manager */

if (rkt->rkt_conf.offset_store_method ==

RD_KAFKA_OFFSET_METHOD_BROKER &&

RD_KAFKAP_STR_IS_NULL(rkt->rkt_rk->rk_group_id)) {

/* Broker based offsets require a group id. */

rd_kafka_toppar_destroy(s_rktp);

rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,

EINVAL);

return -1;

}

} else if (offset < 0) {

rd_kafka_toppar_destroy(s_rktp);

rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,

EINVAL);

return -1;

}

rd_kafka_toppar_op_fetch_start(rd_kafka_toppar_s2i(s_rktp), offset,

rkq, RD_KAFKA_NO_REPLYQ);

rd_kafka_toppar_destroy(s_rktp);

rd_kafka_set_last_error(0, 0);

return 0;

}

int rd_kafka_consume_start (rd_kafka_topic_t *app_rkt, int32_t partition,

int64_t offset) {

rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);

rd_kafka_dbg(rkt->rkt_rk, TOPIC, "START",

"Start consuming partition %"PRId32,partition);

return rd_kafka_consume_start0(rkt, partition, offset, NULL);

}

int rd_kafka_consume_start_queue (rd_kafka_topic_t *app_rkt, int32_t partition,

int64_t offset, rd_kafka_queue_t *rkqu) {

rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);

return rd_kafka_consume_start0(rkt, partition, offset, rkqu->rkqu_q);

}

static rd_kafka_message_t *rd_kafka_consume0 (rd_kafka_t *rk,

rd_kafka_q_t *rkq,

int timeout_ms) {

rd_kafka_op_t *rko;

rd_kafka_message_t *rkmessage = NULL;

rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);

if (timeout_ms)

rd_kafka_app_poll_blocking(rk);

rd_kafka_yield_thread = 0;

while ((rko = rd_kafka_q_pop(rkq,

rd_timeout_remains_us(abs_timeout), 0))) {

rd_kafka_op_res_t res;

res = rd_kafka_poll_cb(rk, rkq, rko,

RD_KAFKA_Q_CB_RETURN, NULL);

if (res == RD_KAFKA_OP_RES_PASS)

break;

if (unlikely(res == RD_KAFKA_OP_RES_YIELD ||

rd_kafka_yield_thread)) {

/* Callback called rd_kafka_yield(), we must

* stop dispatching the queue and return. */

rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INTR,

EINTR);

rd_kafka_app_polled(rk);

return NULL;

}

/* Message was handled by callback. */

continue;

}

if (!rko) {

/* Timeout reached with no op returned. */

rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT,

ETIMEDOUT);

rd_kafka_app_polled(rk);

return NULL;

}

rd_kafka_assert(rk,

rko->rko_type == RD_KAFKA_OP_FETCH ||

rko->rko_type == RD_KAFKA_OP_CONSUMER_ERR);

/* Get rkmessage from rko */

rkmessage = rd_kafka_message_get(rko);

/* Store offset */

rd_kafka_op_offset_store(rk, rko);

rd_kafka_set_last_error(0, 0);

rd_kafka_app_polled(rk);

return rkmessage;

}

rd_kafka_message_t *rd_kafka_consume (rd_kafka_topic_t *app_rkt,

int32_t partition,

int timeout_ms) {

rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);

shptr_rd_kafka_toppar_t *s_rktp;

rd_kafka_toppar_t *rktp;

rd_kafka_message_t *rkmessage;

rd_kafka_topic_rdlock(rkt);

s_rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/);

if (unlikely(!s_rktp))

s_rktp = rd_kafka_toppar_desired_get(rkt, partition);

rd_kafka_topic_rdunlock(rkt);

if (unlikely(!s_rktp)) {

/* No such toppar known */

rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,

ESRCH);

return NULL;

}

rktp = rd_kafka_toppar_s2i(s_rktp);

rkmessage = rd_kafka_consume0(rkt->rkt_rk,

rktp->rktp_fetchq, timeout_ms);

rd_kafka_toppar_destroy(s_rktp); /* refcnt from .._get() */

return rkmessage;

}

rd_kafka_message_t *rd_kafka_consume_queue (rd_kafka_queue_t *rkqu,

int timeout_ms) {

return rd_kafka_consume0(rkqu->rkqu_rk, rkqu->rkqu_q, timeout_ms);

}

(2)Poll轮询消息队列

int rd_kafka_poll (rd_kafka_t *rk, int timeout_ms) {

int r;

if (timeout_ms)

rd_kafka_app_poll_blocking(rk);

r = rd_kafka_q_serve(rk->rk_rep, timeout_ms, 0,

RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL);

rd_kafka_app_polled(rk);

return r;

}

rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk,

int timeout_ms) {

rd_kafka_cgrp_t *rkcg;

if (unlikely(!(rkcg = rd_kafka_cgrp_get(rk)))) {

rd_kafka_message_t *rkmessage = rd_kafka_message_new();

rkmessage->err = RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;

return rkmessage;

}

return rd_kafka_consume0(rk, rkcg->rkcg_q, timeout_ms);

}

rd_kafka_event_t *rd_kafka_queue_poll (rd_kafka_queue_t *rkqu, int timeout_ms) {

rd_kafka_op_t *rko;

if (timeout_ms)

rd_kafka_app_poll_blocking(rkqu->rkqu_rk);

rko = rd_kafka_q_pop_serve(rkqu->rkqu_q, rd_timeout_us(timeout_ms), 0, RD_KAFKA_Q_CB_EVENT, rd_kafka_poll_cb, NULL);

rd_kafka_app_polled(rkqu->rkqu_rk);

if (!rko)

return NULL;

return rko;

}

二、RdKafka C++源码分析

1、C++ API对C API的封装

C++ API主要是对RdKafka C API的封装,根据不同的功能模块封装为不同功能类,类定义在rdkafkacpp.h文件中,并使用RdKafka命名空间进行限定,主要类如下Conf、Handle、TopicPartition、Topic、Message、Queue、KafkaConsumer、Consumer、Producer、BrokerMetadata、PartitionMetadata、TopicMetadata、Metadata、DeliveryReportCb、PartitionerCb、PartitionerKeyPointerCb、EventCb、Event、ConsumeCb:Consume、RebalanceCb、OffsetCommitCb、SocketCb、OpenCb。

2、Consumer与KafkaConsumer

Consumer对partition、offset有完全的控制能力;KafkaConsumer提供了Topic订阅接口,默认使用latest消费方式,可以通过assign方法指定开始消费的partition和offset。

3、Producer生产消息过程

(1)Producer创建

RdKafka::Producer *RdKafka::Producer::create (RdKafka::Conf *conf,

std::string &errstr) {

char errbuf[512];

RdKafka::ConfImpl *confimpl = dynamic_cast<:confimpl>(conf);

RdKafka::ProducerImpl *rkp = new RdKafka::ProducerImpl();

rd_kafka_conf_t *rk_conf = NULL;

if (confimpl) {

if (!confimpl->rk_conf_) {

errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";

delete rkp;

return NULL;

}

rkp->set_common_config(confimpl);

rk_conf = rd_kafka_conf_dup(confimpl->rk_conf_);

if (confimpl->dr_cb_) {

rd_kafka_conf_set_dr_msg_cb(rk_conf, dr_msg_cb_trampoline);

rkp->dr_cb_ = confimpl->dr_cb_;

}

}

rd_kafka_t *rk;

if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, rk_conf,

errbuf, sizeof(errbuf)))) {

errstr = errbuf;

// rd_kafka_new() takes ownership only if succeeds

if (rk_conf)

rd_kafka_conf_destroy(rk_conf);

delete rkp;

return NULL;

}

rkp->rk_ = rk;

return rkp;

}

创建Producer时需要准备好Conf对象。

(2)生产消息

RdKafka::ErrorCode RdKafka::ProducerImpl::produce (RdKafka::Topic *topic,

int32_t partition,

int msgflags,

void *payload, size_t len,

const std::string *key,

void *msg_opaque) {

RdKafka::TopicImpl *topicimpl = dynamic_cast<:topicimpl>(topic);

if (rd_kafka_produce(topicimpl->rkt_, partition, msgflags,

payload, len,

key ? key->c_str() : NULL, key ? key->size() : 0,

msg_opaque) == -1)

return static_cast<:errorcode>(rd_kafka_last_error());

return RdKafka::ERR_NO_ERROR;

}

生产消息时需要指定Topic对象。

(3)Poll轮询

int RdKafka::HandleImpl::poll(int timeout_ms)

{

return rd_kafka_poll(rk_, timeout_ms);

}

produce生产消息是异步的,将消息放入到内部队列后会立刻返回,因此需要由poll返回最终写入结果。 produce是尽力送达的,会在尝试直至超过message.timeout.ms才汇报失败。

4、Consumer消费消息过程

(1)创建KafkaConsumer

RdKafka::KafkaConsumer *RdKafka::KafkaConsumer::create (RdKafka::Conf *conf,

std::string &errstr) {

char errbuf[512];

RdKafka::ConfImpl *confimpl = dynamic_cast<:confimpl>(conf);

RdKafka::KafkaConsumerImpl *rkc = new RdKafka::KafkaConsumerImpl();

rd_kafka_conf_t *rk_conf = NULL;

size_t grlen;

if (!confimpl || !confimpl->rk_conf_) {

errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";

delete rkc;

return NULL;

}

if (rd_kafka_conf_get(confimpl->rk_conf_, "group.id",

NULL, &grlen) != RD_KAFKA_CONF_OK ||

grlen <= 1 /* terminating null only */) {

errstr = "\"group.id\" must be configured";

delete rkc;

return NULL;

}

rkc->set_common_config(confimpl);

rk_conf = rd_kafka_conf_dup(confimpl->rk_conf_);

rd_kafka_t *rk;

if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, rk_conf,

errbuf, sizeof(errbuf)))) {

errstr = errbuf;

// rd_kafka_new() takes ownership only if succeeds

rd_kafka_conf_destroy(rk_conf);

delete rkc;

return NULL;

}

rkc->rk_ = rk;

/* Redirect handle queue to cgrp's queue to provide a single queue point */

rd_kafka_poll_set_consumer(rk);

return rkc;

}

(2)订阅Topic

RdKafka::ErrorCode

RdKafka::KafkaConsumerImpl::subscribe (const std::vector<:string> &topics) {

rd_kafka_topic_partition_list_t *c_topics;

rd_kafka_resp_err_t err;

c_topics = rd_kafka_topic_partition_list_new((int)topics.size());

for (unsigned int i = 0 ; i < topics.size() ; i++)

rd_kafka_topic_partition_list_add(c_topics, topics[i].c_str(),

RD_KAFKA_PARTITION_UA);

err = rd_kafka_subscribe(rk_, c_topics);

rd_kafka_topic_partition_list_destroy(c_topics);

return static_cast<:errorcode>(err);

}

(3)消费消息

RdKafka::Message *RdKafka::KafkaConsumerImpl::consume (int timeout_ms) {

rd_kafka_message_t *rkmessage;

rkmessage = rd_kafka_consumer_poll(this->rk_, timeout_ms);

if (!rkmessage)

return new RdKafka::MessageImpl(NULL, RdKafka::ERR__TIMED_OUT);

return new RdKafka::MessageImpl(rkmessage);

}

三、RdKafka多线程设计

1、Producer/Consumer多线程设计

RdKafka内部使用多线程对硬件资源进行充分利用,RdKafka API是线程安全的,应用程序可以在任意时间调用其线程内的任意API函数。

每个Producer/Consumer实例会创建线程如下:

(1)应用线程,处理具体应用业务逻辑。

(2)Kafka Handler线程:每创建一个Producer/Consumer即会创建一个Handler线程,即RdKafka主线程,线程名称为rdk::main,线程执行函数为rd_kafka_thread_main。

(3)Kafka Broker线程:对于增加到Producer/Consumer的每个Broker会创建一个线程,负责与Broker通信,线程执行函数为rd_kafka_broker_thread_main,线程名称为rdk::brokerxxx。

(4)Inner Broker线程,用于处未分配分区的OP操作队列。

(5)后台线程

如果配置对象设置了background_event_cb,Kafka Handler创建时会创建相应的后台线程和后台队列,线程执行函数为rd_kafka_background_thread_main。

2、线程查看

Linux查看KafkaConsumer进程的线程的方法:

ps -T -p pid

top -H -p pid

Cosnumer线程查看结果:

Producer线程查看结果:

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

rdkafka线程过多_Kafka快速入门(十一)——RdKafka源码分析 的相关文章

  • uni-app store 状态管理学习,多写几遍就会了

    uni app使用了一段时间了 一直没有用到store 状态管理 还是应该学习一下 以后会用到的 1 使用hbuiderx创建uni app项目 2 与static同级创建store文件夹 store文件夹下创建index js 3 关键i
  • sqlserver数据类型转换(将 nvarchar 转换为数据类型 numeric 时出现算术溢出错误)

    一般情况下 sqlserver会自动完成数据转换 但这种转换有时候很容易出错 尤其是nvarchar转换为numeric时 如果能够明确数据类型 最好显式转换 举个我遇到的例子 SELECT FROM ITEM INFO TEST WHER
  • BLE学习(4):蓝牙地址类型和设备的隐私

    蓝牙地址也被称为蓝牙MAC地址 它能唯一标识一个蓝牙设备的48位的值 在蓝牙规范中 它被称为BD ADDR 蓝牙的地址类型可以分为两种 public addresses和random addresses 其中random addresses
  • visual studio2019创建运行第一个C++详细步骤与断点调试的简单认识

    是去官网下的社区版 可以用自己的微软账号登录也不需要网上找破解版了 安装过程还是很顺利的 下面在第一次使用vs2019下创建C 项目 依次helloworld 计算器类 首先明确 在vs中是使用项目来组织代码 使用解决方案来组织项目 所以首
  • adb连接及常用命令

    adb命令连接模拟器设备 以夜神模拟器为例 夜神模拟器默认端口为62001 adb connect 127 0 0 1 62001 当打开了多个模拟器 设备连接失败时 解决方法 查看连接的设备 可以查看连接的adb的设备情况 如果连接多个设
  • 02_计算机网络笔记-网络拓扑-交换机-VLAN

    文章目录 一般家庭的网络拓扑 交换机的基本原理与配置 虚拟局域网VLAN 个人博客 https blog csdn net cPen web 一般家庭的网络拓扑 光猫 调制解调器 1 光信号和电信号的转换 2 路由器的功能 可以拨号 账号和
  • MyBatis 采用注解方式批量更新数据 @Mapper @Update (包含2种方法)

    批量更新数据方法 1 注释db filed name 表示的是的数据库字段名字 entity name 表示的是你的实体字段 table name 表示你的表名 Update
  • ONNXRUNTUIME c++使用(分割网络)与相关资料(暂记)

    下面的教程是在linux系统上运行的 如果想在windows系统上运行 可以看官方链接或中文教程https bbs huaweicloud com blogs 335706 官方链接中有完整的VS的带 sln的项目 ONNXRUNTUIME
  • ES学习笔记之-ClusterState的学习

    前面研究过ES的get api的整体思路 作为编写ES插件时的借鉴 当时的重点在与理解整体流程 主要是shardOperation 的方法内部的调用逻辑 就弱化了shards 方法 实际上shards 方法在理解ES的结构层面 作用更大一些
  • JAVA获取IP地址、电脑Mac地址

    1 获取IP地址 注意 IP地址经过多次反向代理后会有多个IP值 其中第一个IP才是真实IP 所以不能通过 request getRemoteAddr 获取IP地址 如果使用了多级反向代理的话 X Forwarded For的值并不止一个
  • javaWeb项目中分页和模糊查询技术

    分页 需求 登录成功后 展现全部时 出现分页 思路 前端 1 设置分页按钮 以及分页数据 页码 总页数 总条数 2 设置分页请求 即点击上一页 下一页时发请求 后端 3 web xml映射 映射到Servlet能接收请求 4 Dao查询分页
  • opencv实践项目-人脸检测

    目录 1 opencv CascadeClassifier人脸检测步骤 2 CascadeClassifier分类器简介 2 1 从文件中加载级联分类器 2 2 目标检测方法 3 代码实现 1 opencv CascadeClassifie
  • SRVE0255E: A WebGroup/Virtual Host to handle /p2pd/servlet/dispatch has not been defined.

    Technote troubleshooting Problem Abstract When setting up IBM Cognos within IBM WebSphere the URI is not accessible The
  • 【MySQL】轻松学习 普通索引

    目录 引言 一 普通索引的创建 1 创建表时定义索引 2 已存在的表上创建索引 3 ALTER TABLE 语句创建索引 二 查看索引执行情况 引言 创建索引是指在某个表的一列或多列上建立一个索引 以便提高对表的访问速度 创建索引有3种方式
  • umijs----路由(修改路由的某一个path )

    1 在src下创建app js ts tsx 2 修改路由 export function patchRoutes routes routes为 umirc ts中设置的routes数组 可以使用数组的方法插入删除 运行时在最前面插入一个路
  • Webpack配置Vue热更新

    Webpack配置Vue热更新 需要的包 cnpm i vue webpack webpack cli webpack dev server html webpack plugin clean webpack plugin style lo
  • 【正点原子MP157连载】第七章 认识HAL库-摘自【正点原子】STM32MP1 M4裸机CubeIDE开发指南

    1 实验平台 正点原子STM32MP157开发板 2 购买链接 https item taobao com item htm id 629270721801 3 全套实验源码 手册 视频下载地址 http www openedv com t
  • selenium4

    1 单选框和复选框 单选框 type radio 定位 gt 点击 判断是否被选中 元素 is selected 复选框 type checkbox 只选择一个 gt 同单选框一样 全选 定位所有复选框 遍历 判断是否被选中 点击 选择部分
  • java入门六:java基础终章

    1 static关键字 静态变量和类一起加载 final修饰后的类无法被继承 2 抽象类 abstract修饰符可以用来修饰方法也可以修饰类 如果修饰方法 那么该方法就是抽象方法 如果修饰类 那么该类就是抽象类 抽象类中可以没有抽象方法 但

随机推荐

  • Linux Shell程序设计(2)

    实验十一 Shell程序设计 2 一 实验要求 综合运用shell编程知识进行设计性编程 二 实验内容和实验步骤 1 实验内容 假设你作为某工厂生产管理员 需要负责统计各车间每天生产的产品数据 你的计算机安装了双硬盘 为了保证数据安全 你在
  • 【定位问题】Mybatis-plus的selectPage()分页查询不生效问题

    背景 项目需要从mybits切换到mubits plus 但是我在进行分页查询的时候 发现一直不生效 问题原因 添加监听器 配置如下 Configuration MapperScan com baomidou mybatisplus sam
  • parted创建硬盘分区并创建LVM

    目的 将两个三T的硬盘做成LVM sdc sdd 一 parted将硬盘进行分区 1 parted的命令方式 Parted 命令分为两种模式 命令行模式和交互模式 1 命令行模式 parted option device command 该
  • 【原创】第一个iOS应用程序

    摘要 第一个iOS应用程序 包括获取控件 绑定事件 设置属性等内容 iOS Objective C 目录 第一章 窗口与应用程序 第二章 添加视图 2 1 从nib文件初始化视图 2 2 使用脚本添加视图 第三章 添加子视图 3 1 通过x
  • 制作自己的 Kindle 电子书

    想象以下场景 你刚收到一台新的 Kindle Paperwhite 心中已然响起了轰轰烈烈的 我今年 或这个冬天 一定要阅读 100 本书 结果发现 想看的书 Amazon 上找不到 或者排版很糟糕 如何解决 自己动手做呗 准备工作 我使用
  • UE4 UI实现改键功能

    主要内容 本文主要讲解如何在UI中实现自定义按键的功能类似于游戏中的改键操作 用到的是UE4自带的第三人称案例 因为第三人称自带了小白人和几个按键绑定就不用再手动去设置 实现步骤 1 创建两个UMG用来展示UI效果 1 创建WBP Key
  • C++链表合并

    有l1和l2两个链表 这两个链表降序排列 把l2合并到l1中 并按降序排列 同时清空l2链表 例如l1 9 8 7 6 l2 12 11 10 5 4 3 2 1 合并后l1 12 11 10 9 8 7 6 5 4 3 2 1 l2 in
  • 【Android】利用intent启动浏览器

    文章目录 一 默认浏览器 二 指定浏览器 三 选择浏览器 一 默认浏览器 需要设置Action和Date属性 构造 Uri uri Uri parse https www baidu com Intent intent new Intent
  • SCADE Suite 状态机之变量隐式赋值

    SCADE Suite 状态机之变量隐式赋值 1 变量的隐式赋值 目的 简化模型设计 Last 只要没有显示赋值 便取上一周期的数值 Default 只要没有显示赋值 便取默认设置的数值 优先级更高 设置方法 2 定义变量的Last值 1
  • LeetCode 817:链表组件(计数)

    解法一 常规解法 建图 DFS 时间复杂度O n O n 空间复杂度因为需要存储图 所以是O n 这种方法是通解 对于所有图都适用 Definition for singly linked list struct ListNode int
  • lua元表以及元方法

    知微出凡 lua元表以及元方法 lua中的变量是没有数据类型的 值有类型 类型有八种nil number boolean string function thread userdata以及table Lua 中的每个值都可以有一个 元表 这
  • 62.[GIS基础]笛卡尔坐标系

    文章目录 笛卡尔坐标系 多坐标系 坐标系的嵌套 坐标变换 坐标系转换 转载请注明原始链接 http blog csdn net a464057216 article details 54578069 后续此博客不再更新 欢迎大家搜索关注微信
  • 基于粒子群算法(PSO)优化径向基神经网络(PSO-RBF)的分类预测。matlab代码,优化参数为扩散速度,采用交叉验证。多特征输入单输出的二分类及多分类模型。程序内注释详细,直接替换数据就

    清空环境变量 warning off 关闭报警信息 close all 关闭开启的图窗 clear 清空变量 clc 清空命令行 读取数据 res xlsread 数据集 xlsx 分析数据 num class length unique
  • 抛去容抗角度,从电容充放电角度理解RC低通滤波器

  • 和你一起从零开始写RISC-V处理器(2)

    RISC V加法指令的实现 文章目录 RISC V加法指令的实现 上期回顾 一 正片开始 编写各个模块 pc reg模块 if模块 rom模块 if id模块 id模块 regs模块 id ex模块 ex模块 二 顶层模块搭建 三 测试文件
  • 学懂最小生成树(克鲁斯卡尔算法)

    本节 小编将带大家了解最小生成树的第二种构成算法 克鲁斯卡尔算法 Kruskal algorithm 当然 对另一种算法感兴趣的朋友可以看看之前的这篇文章 学懂最小生成树 普里姆算法 目录 一 实现原理 二 代码实现 一 实现原理 克鲁斯卡
  • 后端开发——Flask框架从入门到入坟(中)

    前言 在上一篇文章中荔枝已经梳理了Flask的基础语法 但是想要靠这些东西来写一个项目是远远不够的噢 我们还需要一个更加清晰的项目逻辑来搭建一个Flask后端项目框架 在真实的项目开发中 我们还需要了解如何搭建数据库 如何管理高效管理代码
  • leetcode刷题——栈与队列

    队列 vs 栈 栈 从头进 从头出 只有头部一个进出口 队列 从尾进 从头处 头和尾分别负责出和进 适用于配对问题 20 有效的括号 运用栈尾进头出的思想实现配对 当我们遇到一个左括号时 我们会期望在后续的遍历中 有一个相同类型的右括号将其
  • js 判断数组是否有元素重复

    这里有一个js数组 判断数组是否有重复元素 具体代码 var vecotr for i 0 i
  • rdkafka线程过多_Kafka快速入门(十一)——RdKafka源码分析

    Kafka快速入门 十一 RdKafka源码分析 一 RdKafka C源码分析 1 Kafka OP队列 RdKafka将与Kafka Broke的交互 内部实现的操作都封装成Operator结构 然后放入OP处理队列里统一处理 Kafk