Linux中断处理的“下半部”机制

2023-05-16

前言

中断分为硬件中断,软件中断。中断的处理原则主要有两个:一个是不能嵌套,另外一个是越快越好。在Linux中,分为中断处理采用“上半部”和“下半部”处理机制。

一、中断处理“下半部”机制

中断服务程序一般都是在中断请求关闭的条件下执行的,以避免嵌套而使中断控制复杂化。但是,中断是一个随机事件,它随时会到来,如果关中断的时间太长,CPU就不能及时响应其他的中断请求,从而造成中断的丢失。

因此,Linux内核的目标就是尽可能快的处理完中断请求,尽其所能把更多的处理向后推迟。例如,假设一个数据块已经达到了网线,当中断控制器接受到这个中断请求信号时,Linux内核只是简单地标志数据到来了,然后让处理器恢复到它以前运行的状态,其余的处理稍后再进行(如把数据移入一个缓冲区,接受数据的进程就可以在缓冲区找到数据)。

因此,内核把中断处理分为两部分:上半部(top-half)和下半部(bottom-half),上半部 (就是中断服务程序)内核立即执行,而下半部(就是一些内核函数)留着稍后处理。

首先:一个快速的“上半部”来处理硬件发出的请求,它必须在一个新的中断产生之前终止。通常,除了在设备和一些内存缓冲区(如果你的设备用到了DMA,就不止这些)之间移动或传送数据,确定硬件是否处于健全的状态之外,这一部分做的工作很少。
第二:“下半部”运行时是允许中断请求的,而上半部运行时是关中断的,这是二者之间的主要区别。

内核到底什么时候执行下半部,以何种方式组织下半部?

这就是我们要讨论的下半部实现机制,这种机制在内核的演变过程中不断得到改进,在以前的内核中,这个机制叫做bottom-half(以下简称BH)。但是,Linux的这种bottom-half机制有两个缺点:

  • 在任意一时刻,系统只能有一个CPU可以执行BH代码,以防止两个或多个CPU同时来执行BH函数而相互干扰。因此BH代码的执行是严格“串行化”的。
  • BH函数不允许嵌套。

这两个缺点在单CPU系统中是无关紧要的,但在SMP系统中却是非常致命的。因为BH机制的严格串行化执行显然没有充分利用SMP系统的多CPU特点。为此,在2.4以后的版本中有了新的发展和改进,改进的目标使下半部可以在多处理机上并行执行,并有助于驱动程序的开发者进行驱动程序的开发。下面主要介绍3种5.4内核中的“下半部”处理机制:

  • 软中断请求(softirq)机制
  • 小任务(tasklet)机制
  • 工作队列机制
  • threaded_irq机制

以上三种机制的比较如下图所示

img

二、软中断请求(softirq)机制

Linux的softirq机制是与SMP紧密不可分的。为此,整个softirq机制的设计与实现中自始自终都贯彻了一个思想:“谁触发,谁执行”(Who marks,Who runs),也即触发软中断的那个CPU负责执行它所触发的软中断,而且每个CPU都有它自己的软中断触发与控制机制。这个设计思想也使得softirq机制充分利用了SMP系统的性能和特点。

2.1 软中断描述符

Linux在include/linux/interrupt.h头文件中定义了数据结构softirq_action,来描述一个软中断请求,如下所示:

/* PLEASE, avoid to allocate new softirqs, if you need not _really_ high
   frequency threaded job scheduling. For almost all the purposes
   tasklets are more than enough. F.e. all serial device BHs et
   al. should be converted to tasklets, not to softirqs.
 */

enum
{
	HI_SOFTIRQ=0,	//用于实现高优先级的软中断
	TIMER_SOFTIRQ,
	NET_TX_SOFTIRQ,	//用于实现网络数据的发送
	NET_RX_SOFTIRQ, //用于实现网络数据的接收
	BLOCK_SOFTIRQ,
	IRQ_POLL_SOFTIRQ,
	TASKLET_SOFTIRQ, //用于实现tasklet软中断
	SCHED_SOFTIRQ,
	HRTIMER_SOFTIRQ, /* Unused, but kept as tools rely on the
			    numbering. Sigh! */
	RCU_SOFTIRQ,    /* Preferable RCU should always be the last softirq */

	NR_SOFTIRQS		//大小等于10,最后一个
};

/* map softirq index to softirq name. update 'softirq_to_name' in
 * kernel/softirq.c when adding a new softirq.
 */
extern const char * const softirq_to_name[NR_SOFTIRQS];

/* softirq mask and active fields moved to irq_cpustat_t in
 * asm/hardirq.h to get better cache usage.  KAO
 */

struct softirq_action
{
	void	(*action)(struct softirq_action *);	//函数指针action指向软中断请求的服务函数
};

asmlinkage void do_softirq(void);
asmlinkage void __do_softirq(void);

其中,函数指针action指向软中断请求的服务函数。基于上述软中断描述符,Linux在kernel/softirq.c文件中定义了一个全局的softirq_vec数组:

static struct softirq_action softirq_vec[NR_SOFTIRQS] __cacheline_aligned_in_smp;

在这里系统一共定义了10个软中断请求描述符。软中断向量i(0≤i≤9)所对应的软中断请求描述符就是softirq_vec[i]。这个数组是个系统全局数组,即它被所有的CPU所共享。这里需要注意的一点是:每个CPU虽然都有它自己的触发和控制机制,并且只执行他自己所触发的软中断请求,但是各个CPU所执行的软中断服务例程却是相同的,也即都是执行softirq_vec[ ]数组中定义的软中断服务函数。Linux在kernel/softirq.c中的相关代码如下:

/*
   - No shared variables, all the data are CPU local.
   - If a softirq needs serialization, let it serialize itself
     by its own spinlocks.
   - Even if softirq is serialized, only local cpu is marked for
     execution. Hence, we get something sort of weak cpu binding.
     Though it is still not clear, will it result in better locality
     or will not.

   Examples:
   - NET RX softirq. It is multithreaded and does not require
     any global serialization.
   - NET TX softirq. It kicks software netdevice queues, hence
     it is logically serialized per device, but this serialization
     is invisible to common code.
   - Tasklets: serialized wrt itself.
 */

#ifndef __ARCH_IRQ_STAT
DEFINE_PER_CPU_ALIGNED(irq_cpustat_t, irq_stat);	//较老版本内核irq_cpustat_t irq_stat[NR_CPUS] ____cacheline_aligned;
EXPORT_PER_CPU_SYMBOL(irq_stat);	//较老版本内核EXPORT_SYMBOL(irq_stat)
#endif

static struct softirq_action softirq_vec[NR_SOFTIRQS] __cacheline_aligned_in_smp;

DEFINE_PER_CPU(struct task_struct *, ksoftirqd);

const char * const softirq_to_name[NR_SOFTIRQS] = {
	"HI", "TIMER", "NET_TX", "NET_RX", "BLOCK", "IRQ_POLL",
	"TASKLET", "SCHED", "HRTIMER", "RCU"
};

2.2 软中断触发机制

要实现“谁触发,谁执行”的思想,就必须为每个CPU都定义它自己的触发和控制变量。为此,Linux在\Linux-5.4\arch\arm\include\asm\hardirq.h头文件中定义了数据结构irq_cpustat_t来描述一个CPU的中断统计信息,其中就有用于触发和控制软中断的成员变量。数据结构irq_cpustat_t的定义如下:

IPI: 处理器间的中断(Inter-Processor Interrupts)

#define NR_IPI	7		//Inter-Processor Interrupts,IPI

typedef struct {
	unsigned int __softirq_pending;
#ifdef CONFIG_SMP
	unsigned int ipi_irqs[NR_IPI];
#endif
} ____cacheline_aligned irq_cpustat_t;

#define __inc_irq_stat(cpu, member)	__IRQ_STAT(cpu, member)++
#define __get_irq_stat(cpu, member)	__IRQ_STAT(cpu, member)

#define __IRQ_STAT(cpu, member) (irq_stat[cpu].member)
//\Linux-5.4\include\linux\irq_cpustat.h文件中
DECLARE_PER_CPU_ALIGNED(irq_cpustat_t, irq_stat);	/* defined in asm/hardirq.h */
#define __IRQ_STAT(cpu, member)	(per_cpu(irq_stat.member, cpu))

//\Linux-5.4\include\linux\percpu-defs.h文字中
/*
 * Normal declaration and definition macros.
 */
#define DECLARE_PER_CPU_SECTION(type, name, sec)			\
	extern __PCPU_ATTRS(sec) __typeof__(type) name

#define DECLARE_PER_CPU_ALIGNED(type, name)				\
	DECLARE_PER_CPU_SECTION(type, name, PER_CPU_ALIGNED_SECTION)	\
	____cacheline_aligned

展开可得到以下定义:

 irq_cpustat_t irq_stat[NR_CPUS] ____cacheline_aligned;

其中:

  1. NR_CPUS,为系统中CPU个数。
  2. 这样,每个CPU都只操作它自己的中断统计信息结构。假设有一个编号为id的CPU,那么它只能操作它自己的中断统计信息结构irq_stat[id](0≤id≤NR_CPUS-1),从而使各CPU之间互不影响。

使用open_softirq()函数可以注册软中断对应的处理函数,而raise_softirq()函数可以出发一个软件中断。

//使用softirq机制需要通过open_softirq来注册软中断处理函数,使中断索引号与中断处理函数对应。该函数定义在kernel/softirq.c文件中。
//该函数将软中断的中断处理函数指针赋值给相应的softirq_vec。
//nr为中断号,action为中断处理函数
void open_softirq(int nr, void (*action)(struct softirq_action *))
{
	softirq_vec[nr].action = action;
}
//在中断处理函数完成了紧急的硬件操作后,就应该调用raise_softirq函数来触发软中断,让软中断来处理耗时的中断下半部操作。
//如果没有处于中断中,则立马唤醒线程softirqd
void raise_softirq(unsigned int nr)		//触发一个软中断,nr为中断号
{
	unsigned long flags;

	local_irq_save(flags);
	raise_softirq_irqoff(nr);	//raise_softirq_irqoff来触发相应的软中断,将相应的bit位置位
	local_irq_restore(flags);
}

raise_softirq_irqoff函数:

/*
 * This function must run with irqs disabled!
 */
inline void raise_softirq_irqoff(unsigned int nr)
{
	__raise_softirq_irqoff(nr);

	/*
	 * If we're in an interrupt or softirq, we're done
	 * (this also catches softirq-disabled code). We will
	 * actually run the softirq once we return from
	 * the irq or softirq.
	 *
	 * Otherwise we wake up ksoftirqd to make sure we
	 * schedule the softirq soon.
	 */
	if (!in_interrupt())
		wakeup_softirqd();
}

通过in_interrupt判断现在是否在中断上下文中,或者软中断是否被禁止,如果都不成立,我们必须要调用wakeup_softirqd函数用来唤醒本CPU上的softirqd这个内核线程。

2.3 初始化软中断

//初始化软中断(softirq_init)
//在start_kernel()进行系统初始化中,就调用了softirq_init()函数对HI_SOFTIRQ和TASKLET_SOFTIRQ两个软中断进行了初始化
//Linux-5.4\init\main.c -> softirq_init()
void __init softirq_init(void)
{
	int cpu;

	for_each_possible_cpu(cpu) {
		per_cpu(tasklet_vec, cpu).tail =
			&per_cpu(tasklet_vec, cpu).head;
		per_cpu(tasklet_hi_vec, cpu).tail =
			&per_cpu(tasklet_hi_vec, cpu).head;
	}

	open_softirq(TASKLET_SOFTIRQ, tasklet_action);	//设置软中断服务函数
	open_softirq(HI_SOFTIRQ, tasklet_hi_action);	//设置软中断服务函数
}

2.4 软中断服务的执行及处理

2.4.1 在中断返回现场调度__do_softirq

在中断处理程序中触发软中断是最常见的形式,一个硬件中断处理完成之后。下面的函数在处理完硬件中断之后退出中断处理函数,在irq_exit中会触发软件中断的处理。

中断处理模型:

这里写图片描述

硬中断处理过程:

fastcall unsigned int do_IRQ(struct pt_regs *regs)
{
        ...
        irq_enter();

        //handle external interrupt (ISR)
        ...
          irq_exit();

        return 1;
}

硬中断处理完毕后会执行irq_exit()函数

/*
 * Exit an interrupt context. Process softirqs if needed and possible:
 */
void irq_exit(void)
{
#ifndef __ARCH_IRQ_EXIT_IRQS_DISABLED
	local_irq_disable();
#else
	lockdep_assert_irqs_disabled();
#endif
	account_irq_exit_time(current);
	preempt_count_sub(HARDIRQ_OFFSET);
	if (!in_interrupt() && local_softirq_pending())	//如果没有处于中断上下文中,以及没有标志位,则调用invoke_softirq函数,调用__do_softirq函数,或者唤醒处理软中断的线程。
		invoke_softirq();

	tick_irq_exit();
	rcu_irq_exit();
	trace_hardirq_exit(); /* must be last! */
}

在irq_exit()函数中会调用invoke_softirq函数

static inline void invoke_softirq(void)
{
	if (ksoftirqd_running(local_softirq_pending()))
		return;

	if (!force_irqthreads) {
#ifdef CONFIG_HAVE_IRQ_EXIT_ON_IRQ_STACK
		/*
		 * We can safely execute softirq on the current stack if
		 * it is the irq stack, because it should be near empty
		 * at this stage.
		 */
		__do_softirq();	/* 软中断处理函数 */
#else
		/*
		 * Otherwise, irq_exit() is called on the task stack that can
		 * be potentially deep already. So call softirq in its own stack
		 * to prevent from any overrun.
		 */
		do_softirq_own_stack();	//__do_softirq();
#endif
	} else {
		wakeup_softirqd();/* 如果强制使用软中断线程进行软中断处理,会通知调度器唤醒软中断线程ksoftirqd */
	}
}

函数do_softirq()负责执行数组softirq_vec[i]中设置的软中断服务函数。每个CPU都是通过执行这个函数来执行软中断服务的。由于同一个CPU上的软中断服务例程不允许嵌套,因此,do_softirq()函数一开始就检查当前CPU是否已经正出在中断服务中,如果是则do_softirq()函数立即返回。举个例子,假设CPU0正在执行do_softirq()函数,执行过程产生了一个高优先级的硬件中断,于是CPU0转去执行这个高优先级中断所对应的中断服务程序。众所周知,所有的中断服务程序最后都要跳转到do_IRQ()函数并由它来依次执行中断服务队列中的ISR,这里我们假定这个高优先级中断的ISR请求触发了一次软中断,于是do_IRQ()函数在退出之前看到有软中断请求,从而调用do_softirq()函数来服务软中断请求。因此,CPU0再次进入do_softirq()函数(也即do_softirq()函数在CPU0上被重入了)。但是在这一次进入do_softirq()函数时,它马上发现CPU0此前已经处在中断服务状态中了,因此这一次do_softirq()函数立即返回。于是,CPU0回到该开始时的do_softirq()函数继续执行,并为高优先级中断的ISR所触发的软中断请求补上一次服务。从这里可以看出,do_softirq()函数在同一个CPU上的执行是串行的。

do_softirq函数:

asmlinkage __visible void do_softirq(void)
{
	__u32 pending;
	unsigned long flags;
	/* 判断是否在中断处理中,如果正在中断处理,就直接返回 */
	if (in_interrupt())
		return;
	/* 保存当前寄存器的值 */
	local_irq_save(flags);
	/* 取得当前已注册软中断的位图 */
	pending = local_softirq_pending();

    /* 循环处理所有已注册的软中断 */
	if (pending && !ksoftirqd_running(pending))
		do_softirq_own_stack();		//__do_softirq

	local_irq_restore(flags);
}

__do_softirq函数:

asmlinkage __visible void __softirq_entry __do_softirq(void)
{
	unsigned long end = jiffies + MAX_SOFTIRQ_TIME;	/* 为了防止软中断执行时间太长,设置了一个软中断结束时间 */
	unsigned long old_flags = current->flags;		 /* 保存当前进程的标志 */
	int max_restart = MAX_SOFTIRQ_RESTART;			/* 软中断循环执行次数: 10次 */
	struct softirq_action *h;						/* 软中断的action指针 */
	bool in_hardirq; 
	__u32 pending;
	int softirq_bit;

	/*
	 * Mask out PF_MEMALLOC as the current task context is borrowed for the
	 * softirq. A softirq handled, such as network RX, might set PF_MEMALLOC
	 * again if the socket is related to swapping.
	 */
	current->flags &= ~PF_MEMALLOC;

	pending = local_softirq_pending();			/* 获取此CPU的__softirq_pengding变量值 */
	account_irq_enter_time(current);			/* 用于统计进程被软中断使用时间 */

	__local_bh_disable_ip(_RET_IP_, SOFTIRQ_OFFSET);	/* 增加preempt_count软中断计数器,也表明禁止了调度 */
	in_hardirq = lockdep_softirq_start();

restart:										/* 循环10次的入口,每次循环都会把所有挂起需要执行的软中断执行一遍 */
	/* 该CPU的__softirq_pending清零,当前的__softirq_pending保存在pending变量中 */
    /* 这样做就保证了新的软中断会在下次循环中执行 */
	
	/* Reset the pending bitmask before enabling irqs */
	set_softirq_pending(0);
	
	local_irq_enable();		/* 开中断 */

	h = softirq_vec;		/* h指向软中断数组头 */

	while ((softirq_bit = ffs(pending))) {/* 每次获取最高优先级的已挂起软中断 */
		unsigned int vec_nr;
		int prev_count;

		h += softirq_bit - 1;		/* 获取此软中断描述符地址 */

		vec_nr = h - softirq_vec;	/* 减去软中断描述符数组首地址,获得软中断号 */
		prev_count = preempt_count();	/* 获取preempt_count的值 */

		kstat_incr_softirqs_this_cpu(vec_nr);	/* 增加统计中该软中断发生次数 */

		trace_softirq_entry(vec_nr);
		h->action(h);							/* 执行该软中断的action操作 */
		trace_softirq_exit(vec_nr);
		/* 之前保存的preempt_count并不等于当前的preempt_count的情况处理,也是简单的把之前的复制到当前的preempt_count上,这样做是防止最后软中断计数不为0导致系统不能够执行调度 */
		if (unlikely(prev_count != preempt_count())) {
			pr_err("huh, entered softirq %u %s %p with preempt_count %08x, exited with %08x?\n",
			       vec_nr, softirq_to_name[vec_nr], h->action,
			       prev_count, preempt_count());
			preempt_count_set(prev_count);
		}
		h++;/* h指向下一个软中断,但下个软中断并不一定需要执行,这里只是配合softirq_bit做到一个处理 */
		pending >>= softirq_bit;
	}

	if (__this_cpu_read(ksoftirqd) == current)
		rcu_softirq_qs();
	local_irq_disable();		/* 关中断 */
	/* 循环结束后再次获取CPU的__softirq_pending变量,为了检查是否还有软中断未执行 */
	pending = local_softirq_pending();
	/* 在还有软中断需要执行的情况下,如果时间片没有执行完,并且循环次数也没到10次,继续执行软中断 */
	if (pending) {
		if (time_before(jiffies, end) && !need_resched() &&
		    --max_restart)
			goto restart;
		/* 这里是有软中断挂起,但是软中断时间和循环次数已经用完,通知调度器唤醒软中断线程去执行挂起的软中断,软中断线程是ksoftirqd,这里只起到一个通知作用,因为在中断上下文中是禁止调度的 */
		wakeup_softirqd();
	}

	lockdep_softirq_end(in_hardirq);
	account_irq_exit_time(current);/* 用于统计进程被软中断使用时间 */
	/* 减少preempt_count中的软中断计数器 */
	__local_bh_enable(SOFTIRQ_OFFSET);
	WARN_ON_ONCE(in_interrupt());
	/* 还原进程标志 */
	current_restore_flags(old_flags, PF_MEMALLOC);
}

2.4.2 在守护线程ksoftirq中执行

虽然大部分的softirq是在中断退出的情况下执行,但是有几种情况会在ksoftirq中执行。

  1. 从上文看出,raise_softirq主动触发,而此时正好不是在中断上下文中,ksoftirq进程将被唤醒。
  2. 在irq_exit中执行软中断,但是在经过MAX_SOFTIRQ_RESTART次循环后,软中断还未处理完,这种情况,ksoftirq进程也会被唤醒。

所以加入守护线程这一机制,主要是担心一旦有大量的软中断等待执行,会使得内核过长地留在中断上下文中。

static void wakeup_softirqd(void)
{
	/* Interrupts are disabled: no need to stop preemption */
	struct task_struct *tsk = __this_cpu_read(ksoftirqd);//ksoftirqd这个线程

	if (tsk && tsk->state != TASK_RUNNING)
		wake_up_process(tsk);
}

softirq_threads线程函数为run_ksoftirqd:

//\kernel\softirq.c文件中
static struct smp_hotplug_thread softirq_threads = {
	.store			= &ksoftirqd,
	.thread_should_run	= ksoftirqd_should_run,
	.thread_fn		= run_ksoftirqd,
	.thread_comm		= "ksoftirqd/%u",
};

run_ksoftirqd函数:

static void run_ksoftirqd(unsigned int cpu)
{
	local_irq_disable();
	if (local_softirq_pending()) {
		/*
		 * We can safely run softirq on inline stack, as we are not deep
		 * in the task stack here.
		 */
		__do_softirq();		//最终还是调用这个函数。
		local_irq_enable();
		cond_resched();
		return;
	}
	local_irq_enable();
}

三、小任务机制

tasklet机制是一种较为特殊的软中断。

tasklet一词的原意是“小片任务”的意思,这里是指一小段可执行的代码,且通常以函数的形式出现。软中断向量HI_SOFTIRQ和TASKLET_SOFTIRQ均是用tasklet机制来实现的。

从某种程度上讲,tasklet机制是Linux内核对BH机制的一种扩展。在2.4内核引入了softirq机制后,原有的BH机制正是通过tasklet机制这个桥梁来将softirq机制纳入整体框架中的。正是由于这种历史的延伸关系,使得tasklet机制与一般意义上的软中断有所不同,而呈现出以下两个显著的特点:

  1. 与一般的软中断不同,某一段tasklet代码在某个时刻只能在一个CPU上运行,而不像一般的软中断服务函数(即softirq_action结构中的action函数指针)那样——在同一时刻可以被多个CPU并发地执行。
  2. 与BH机制不同,不同的tasklet代码在同一时刻可以在多个CPU上并发地执行,而不像BH机制那样必须严格地串行化执行(也即在同一时刻系统中只能有一个CPU执行BH函数)。

3.1 tasklet描述符

Linux用数据结构tasklet_struct来描述一个tasklet,每个结构代表一个独立的小任务。该数据结构定义在include/linux/interrupt.h头文件中。如下所示:

/* Tasklets --- multithreaded analogue of BHs.

   Main feature differing them of generic softirqs: tasklet
   is running only on one CPU simultaneously.

   Main feature differing them of BHs: different tasklets
   may be run simultaneously on different CPUs.

   Properties:
   * If tasklet_schedule() is called, then tasklet is guaranteed
     to be executed on some cpu at least once after this.
   * If the tasklet is already scheduled, but its execution is still not
     started, it will be executed only once.
   * If this tasklet is already running on another CPU (or schedule is called
     from tasklet itself), it is rescheduled for later.
   * Tasklet is strictly serialized wrt itself, but not
     wrt another tasklets. If client needs some intertask synchronization,
     he makes it with spinlocks.
 */

struct tasklet_struct
{
	struct tasklet_struct *next;
	unsigned long state;
	atomic_t count;
	void (*func)(unsigned long);
	unsigned long data;
};

其中:

  • next: 指向下一个tasklet的指针;

  • state: 定义了这个tasklet的当前状态。这一个32位的无符号长整数,当前只使用了bit[1]和bit[0]两个状态位。其中,bit[1]=1 表示这个tasklet当前正在某个CPU上被执行,它仅对SMP系统才有意义,其作用就是为了防止多个CPU同时执行一个tasklet的情形出现;bit[0]=1表示这个tasklet已经被调度去等待执行了。

对这两个状态位的宏定义如下所示(interrupt.h):

enum
{
	TASKLET_STATE_SCHED,	/* Tasklet is scheduled for execution */
	TASKLET_STATE_RUN	/* Tasklet is running (SMP only) */
};
  • count: 子计数count,对这个tasklet的引用计数值。

注:只有当count等于0时,tasklet代码段才能执行,也即此时tasklet是被使能的;如果count非零,则这个tasklet是被禁止的。任何想要执行一个tasklet代码段的人都首先必须先检查其count成员是否为0。

  • func:指向以函数形式表现的可执行tasklet代码段。
  • data:函数func的参数。这是一个32位的无符号整数,其具体含义可供func函数自行解释,比如将其解释成一个指向某个用户自定义数据结构的地址值。

Linux在interrupt.h头文件中又定义了两个用来定义tasklet_struct结构变量的辅助宏:

//定义一个tasklet_struct结构体
#define DECLARE_TASKLET(name, func, data) \
struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(0), func, data }

#define DECLARE_TASKLET_DISABLED(name, func, data) \
struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(1), func, data }

显然,从上述源代码可以看出,用DECLARE_TASKLET宏定义的tasklet在初始化时是被使能的(enabled),因为其count成员为0。而用DECLARE_TASKLET_DISABLED宏定义的tasklet在初始时是被禁止的(disabled),因为其count等于1。

3.2 改变一个tasklet状态的操作

在这里,tasklet状态指两个方面:

  1. state:成员所表示的运行状态;
  2. count:成员决定的使能/禁止状态。

3.2.1 改变一个tasklet的运行状态

state成员中的bit[0]表示一个tasklet是否已被调度去等待执行,bit[1]表示一个tasklet是否正在某个CPU上执行。对于state变量中某位的改变必须是一个原子操作,因此可以用定义在include/asm/bitops.h头文件中的位操作来进行。
  由于bit[1]这一位(即TASKLET_STATE_RUN)仅仅对于SMP系统才有意义,因此Linux在Interrupt.h头文件中显示地定义了对TASKLET_STATE_RUN位的操作。如下所示:

#ifdef CONFIG_SMP
static inline int tasklet_trylock(struct tasklet_struct *t)
{
	return !test_and_set_bit(TASKLET_STATE_RUN, &(t)->state);
}
 
static inline void tasklet_unlock(struct tasklet_struct *t)
{
	smp_mb__before_clear_bit(); 
	clear_bit(TASKLET_STATE_RUN, &(t)->state);
}
 
static inline void tasklet_unlock_wait(struct tasklet_struct *t)
{
	while (test_bit(TASKLET_STATE_RUN, &(t)->state)) { barrier(); }
}
#else
#define tasklet_trylock(t) 1
#define tasklet_unlock_wait(t) do { } while (0)
#define tasklet_unlock(t) do { } while (0)
#endif

显然,在SMP系统同,tasklet_trylock()宏将把一个tasklet_struct结构变量中的state成员中的bit[1]位设置成1,同时还返回bit[1]位的非。因此,如果bit[1]位原有值为1(表示另外一个CPU正在执行这个tasklet代码),那么tasklet_trylock()宏将返回值0,也就表示上锁不成功。如果bit[1]位的原有值为0,那么tasklet_trylock()宏将返回值1,表示加锁成功。而在单CPU系统中,tasklet_trylock()宏总是返回为1。
任何想要执行某个tasklet代码的程序都必须首先调用宏tasklet_trylock()来试图对这个tasklet进行上锁(即设置TASKLET_STATE_RUN位),且只能在上锁成功的情况下才能执行这个tasklet。建议!即使你的程序只在CPU系统上运行,你也要在执行tasklet之前调用tasklet_trylock()宏,以便使你的代码获得良好可移植性。
  在SMP系统中,tasklet_unlock_wait()宏将一直不停地测试TASKLET_STATE_RUN位的值,直到该位的值变为0(即一直等待到解锁),假如:CPU0正在执行tasklet A的代码,在此期间,CPU1也想执行tasklet A的代码,但CPU1发现tasklet A的TASKLET_STATE_RUN位为1,于是它就可以通过tasklet_unlock_wait()宏等待tasklet A被解锁(也即TASKLET_STATE_RUN位被清零)。在单CPU系统中,这是一个空操作。
  宏tasklet_unlock()用来对一个tasklet进行解锁操作,也即将TASKLET_STATE_RUN位清零。在单CPU系统中,这是一个空操作。

3.2.2 使能/禁止一个tasklet

使能与禁止操作往往总是成对地被调用的,tasklet_disable()函数如下(interrupt.h):

static inline void tasklet_disable(struct tasklet_struct *t)
{
	tasklet_disable_nosync(t);
	tasklet_unlock_wait(t);
	smp_mb();
}

函数tasklet_disable_nosync()也是一个静态inline函数,它简单地通过原子操作将count成员变量的值加1。如下所示(interrupt.h):

static inline void tasklet_disable_nosync(struct tasklet_struct *t)
{
	atomic_inc(&t->count);
	smp_mb__after_atomic_inc();
}

函数tasklet_enable()用于使能一个tasklet,如下所示(interrupt.h):

static inline void tasklet_enable(struct tasklet_struct *t)
{
	smp_mb__before_atomic_dec();
	atomic_dec(&t->count);
}

3.3 tasklet描述符的初始化与杀死

函数tasklet_init()用来初始化一个指定的tasklet描述符,其源码如下所示(kernel/softirq.c):

void tasklet_init(struct tasklet_struct *t,
		  void (*func)(unsigned long), unsigned long data)
{
	t->next = NULL;
	t->state = 0;
	atomic_set(&t->count, 0);
	t->func = func;
	t->data = data;
}

函数tasklet_kill()用来将一个已经被调度了的tasklet杀死,即将其恢复到未调度的状态。其源码如下所示(kernel/softirq.c):

void tasklet_kill(struct tasklet_struct *t)
{
	if (in_interrupt())
		pr_notice("Attempt to kill tasklet from interrupt\n");

	while (test_and_set_bit(TASKLET_STATE_SCHED, &t->state)) {
		do {
			yield();
		} while (test_bit(TASKLET_STATE_SCHED, &t->state));
	}
	tasklet_unlock_wait(t);
	clear_bit(TASKLET_STATE_SCHED, &t->state);
}

3.4 tasklet对列

多个tasklet可以通过tasklet描述符中的next成员指针链接成一个单向对列。为此,Linux专门在头文件include/linux/interrupt.h中定义了数据结构tasklet_head来描述一个tasklet对列的头部指针。如下所示:

//\Linux-5.4\kernel\softirq.c文件中
/*
 * Tasklets
 */
struct tasklet_head
{
	struct tasklet_struct *head;
	struct tasklet_struct **tail;
};

尽管tasklet机制是特定于软中断向量HI_SOFTIRQ和TASKLET_SOFTIRQ的一种实现,但是tasklet机制仍然属于softirq机制的整体框架范围内的,因此,它的设计与实现仍然必须坚持“谁触发,谁执行”的思想。为此,Linux为系统中的每一个CPU都定义了一个tasklet对列头部,来表示应该有各个CPU负责执行的tasklet对列。如下所示(kernel/softirq.c):

//\Linux-5.4\include\linux\percpu-defs.h
/*
 * Variant on the per-CPU variable declaration/definition theme used for
 * ordinary per-CPU variables.
 */
#define DECLARE_PER_CPU(type, name)					\
	DECLARE_PER_CPU_SECTION(type, name, "")

#define DEFINE_PER_CPU(type, name)					\
	DEFINE_PER_CPU_SECTION(type, name, "")
//(kernel/softirq.c)
static DEFINE_PER_CPU(struct tasklet_head, tasklet_vec);
static DEFINE_PER_CPU(struct tasklet_head, tasklet_hi_vec);

上面展开,可得:

struct tasklet_head tasklet_vec[NR_CPUS] __cacheline_aligned;
struct tasklet_head tasklet_hi_vec[NR_CPUS] __cacheline_aligned;

其中,tasklet_vec[]数组用于软中断向量TASKLET_SOFTIRQ,而tasklet_hi_vec[]数组则用于软中断向量HI_SOFTIRQ。也即,如果CPUi(0≤i≤NR_CPUS-1)触发了软中断向量TASKLET_SOFTIRQ,那么对列tasklet_vec[i]中的每一个tasklet都将在CPUi服务于软中断向量TASKLET_SOFTIRQ时被CPUi所执行。同样地,如果CPUi(0≤i≤NR_CPUS-1)触发了软中断向量HI_SOFTIRQ,那么队列tasklet_hi_vec[i]中的每一个tasklet都将CPUi在对软中断向量HI_SOFTIRQ进行服务时被CPUi所执行。
  队列tasklet_vec[I]和tasklet_hi_vec[I]中的各个tasklet是怎样被所CPUi所执行的呢?其关键就是软中断向量TASKLET_SOFTIRQ和HI_SOFTIRQ的软中断服务程序——tasklet_action()函数和tasklet_hi_action()函数。下面我们就来分析这两个函数。

3.5 软中断向量TASKLET_SOFTIRQ和HI_SOFTIRQ

Linux为软中断向量TASKLET_SOFTIRQ和HI_SOFTIRQ实现了专用的触发函数和软中断服务函数。

  • 专用的触发函数

tasklet_schedule()函数和tasklet_hi_schedule()函数分别用来在当前CPU上触发软中断向量TASKLET_SOFTIRQ和HI_SOFTIRQ,并把指定的tasklet加入当前CPU所对应的tasklet队列中去等待执行。

  • 专用的软中断服务函数

tasklet_action()函数和tasklet_hi_action()函数则分别是软中断向量TASKLET_SOFTIRQ和HI_SOFTIRQ的软中断服务函数。在初始化函数softirq_init()中,这两个软中断向量对应的描述符softirq_vec[0]和softirq_vec[6]中的action函数指针就被分别初始化成指向函数tasklet_hi_action()和函数tasklet_action()。

3.5.1软中断向量TASKLET_SOFTIRQ的触发函数tasklet_schedule

该函数实现在include/linux/interrupt.h头文件中,是一个inline函数。其源码如下所示:

//Linux-5.4\include\linux\interrupt.h
static inline void tasklet_schedule(struct tasklet_struct *t)
{
	if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state))
		__tasklet_schedule(t);
}
//\Linux-5.4\kernel\softirq.c
void __tasklet_schedule(struct tasklet_struct *t)
{
	__tasklet_schedule_common(t, &tasklet_vec,
				  TASKLET_SOFTIRQ);
}
//\Linux-5.4\kernel\softirq.c
static void __tasklet_schedule_common(struct tasklet_struct *t,
				      struct tasklet_head __percpu *headp,
				      unsigned int softirq_nr)
{
	struct tasklet_head *head;
	unsigned long flags;

	local_irq_save(flags);
	head = this_cpu_ptr(headp);
	t->next = NULL;
	*head->tail = t;
	head->tail = &(t->next);
	raise_softirq_irqoff(softirq_nr);
	local_irq_restore(flags);
}
  • 调用test_and_set_bit()函数将待调度的tasklet的state成员变量的bit[0]位(也即TASKLET_STATE_SCHED位)设置为1,该函数同时还返回TASKLET_STATE_SCHED位的原有值。因此如果bit[0]为的原有值已经为1,那就说明这个tasklet已经被调度到另一个CPU上去等待执行了。由于一个tasklet在某一个时刻只能由一个CPU来执行,因此tasklet_schedule()函数什么也不做就直接返回了。否则,就继续下面的调度操作。

  • 首先,调用local_irq_save()函数来关闭当前CPU的中断,以保证下面的步骤在当前CPU上原子地被执行。

  • 然后,将待调度的tasklet添加到当前CPU对应的tasklet队列的尾部。

  • 接着,调用raise_softirq_irqoff函数在当前CPU上触发软中断请求TASKLET_SOFTIRQ。

  • 最后,调用local_irq_restore()函数来开当前CPU的中断。

3.5.2软中断向量TASKLET_SOFTIRQ的服务程序tasklet_action

函数tasklet_action()是tasklet机制与软中断向量TASKLET_SOFTIRQ的联系纽带。正是该函数将当前CPU的tasklet队列中的各个tasklet放到当前CPU上来执行的。该函数实现在kernel/softirq.c文件中,其源代码如下:

static __latent_entropy void tasklet_action(struct softirq_action *a)
{
	tasklet_action_common(a, this_cpu_ptr(&tasklet_vec), TASKLET_SOFTIRQ);
}

static void tasklet_action_common(struct softirq_action *a,
				  struct tasklet_head *tl_head,
				  unsigned int softirq_nr)
{
	struct tasklet_struct *list;

	local_irq_disable();
	list = tl_head->head;
	tl_head->head = NULL;
	tl_head->tail = &tl_head->head;
	local_irq_enable();

	while (list) {
		struct tasklet_struct *t = list;

		list = list->next;

		if (tasklet_trylock(t)) {
			if (!atomic_read(&t->count)) {
				if (!test_and_clear_bit(TASKLET_STATE_SCHED,
							&t->state))
					BUG();
				t->func(t->data);
				tasklet_unlock(t);
				continue;
			}
			tasklet_unlock(t);
		}

		local_irq_disable();
		t->next = NULL;
		*tl_head->tail = t;
		tl_head->tail = &t->next;
		__raise_softirq_irqoff(softirq_nr);
		local_irq_enable();
	}
}
  • 首先,在当前CPU关中断的情况下,“原子”地读取当前CPU的tasklet队列头部指针,将其保存到局部变量list指针中,然后将当前CPU的tasklet队列头部指针设置为NULL,以表示理论上当前CPU将不再有tasklet需要执行(但最后的实际结果却并不一定如此,下面将会看到)。
  • 然后,用一个while{}循环来遍历由list所指向的tasklet队列,队列中的各个元素就是将在当前CPU上执行的tasklet。循环体的执行步骤如下:
  • 用指针t来表示当前队列元素,即当前需要执行的tasklet。
  • 更新list指针为list->next,使它指向下一个要执行的tasklet。
  • 用tasklet_trylock()宏试图对当前要执行的tasklet(由指针t所指向)进行加锁,如果加锁成功(当前没有任何其他CPU正在执行这个tasklet),则用原子读函数atomic_read()进一步判断count成员的值。如果count为0,说明这个tasklet是允许执行的,于是:
  1. 先清除TASKLET_STATE_SCHED位;
  2. 然后,调用这个tasklet的可执行函数func;
  3. 调用宏tasklet_unlock()来清除TASKLET_STATE_RUN位 ;
  4. 最后,执行continue语句跳过下面的步骤,回到while循环继续遍历队列中的下一个元素。如果count不为0,说明这个tasklet是禁止运行的,于是调用tasklet_unlock()清除前面用tasklet_trylock()设置的TASKLET_STATE_RUN位。

3.6 tasklet使用总结

1、声明和使用小任务大多数情况下,为了控制一个常用的硬件设备,小任务机制是实现下半部的最佳选择。小任务可以动态创建,使用方便,执行起来也比较快。我们既可以静态地创建小任务,也可以动态地创建它。选择那种方式取决于到底是想要对小任务进行直接引用还是一个间接引用。如果准备静态地创建一个小任务(也就是对它直接引用),使用下面两个宏中的一个:
DECLARE_TASKLET(name,func, data)
DECLARE_TASKLET_DISABLED(name,func, data)
这两个宏都能根据给定的名字静态地创建一个tasklet_struct结构。当该小任务被调度以后,给定的函数func会被执行,它的参数由data给出。这两个宏之间的区别在于引用计数器的初始值设置不同。第一个宏把创建的小任务的引用计数器设置为0,因此,该小任务处于激活状态。另一个把引用计数器设置为1,所以该小任务处于禁止状态。例如:
DECLARE_TASKLET(my_tasklet,my_tasklet_handler, dev);
这行代码其实等价于
struct tasklet_struct my_tasklet = { NULL, 0, ATOMIC_INIT(0),tasklet_handler,dev};
这样就创建了一个名为my_tasklet的小任务,其处理程序为tasklet_handler,并且已被激活。当处理程序被调用的时候,dev就会被传递给它。

2、编写自己的小任务处理程序小任务处理程序必须符合如下的函数类型:

void tasklet_handler(unsigned long data)
由于小任务不能睡眠,因此不能在小任务中使用信号量或者其它产生阻塞的函数。但是小任务运行时可以响应中断。

3、调度自己的小任务通过调用tasklet_schedule()函数并传递给它相应的tasklt_struct指针,该小任务就会被调度以便适当的时候执行:
tasklet_schedule(&my_tasklet); /*把my_tasklet标记为挂起 */
在小任务被调度以后,只要有机会它就会尽可能早的运行。在它还没有得到运行机会之前,如果一个相同的小任务又被调度了,那么它仍然只会运行一次。

可以调用tasklet_disable()函数来禁止某个指定的小任务。如果该小任务当前正在执行,这个函数会等到它执行完毕再返回。调用tasklet_enable()函数可以激活一个小任务,如果希望把以DECLARE_TASKLET_DISABLED()创建的小任务激活,也得调用这个函数,如:

tasklet_disable(&my_tasklet); /小任务现在被禁止,这个小任务不能运行/

tasklet_enable(&my_tasklet); /* 小任务现在被激活*/

也可以调用tasklet_kill()函数从挂起的队列中去掉一个小任务。该函数的参数是一个指向某个小任务的tasklet_struct的长指针。在小任务重新调度它自身的时候,从挂起的队列中移去已调度的小任务会很有用。这个函数首先等待该小任务执行完毕,然后再将它移去。

4、tasklet的简单用法

下面是tasklet的一个简单应用,以模块的形成加载。

#include <linux/module.h>
#include <linux/init.h>
#include <linux/fs.h>
#include <linux/kdev_t.h>
#include <linux/cdev.h>
#include <linux/kernel.h>
#include <linux/interrupt.h>
 
static struct  t asklet_struct my_tasklet;
 
static void tasklet_handler (unsigned long d ata)
{
        printk(KERN_ALERT,"tasklet_handler is running./n");
}
 
static int __init test_init(void)
{
        tasklet_init(&my_tasklet,tasklet_handler,0);
        tasklet_schedule(&my_tasklet);
        return0;
}
 
static  void __exit test_exit(void)
{
        tasklet_kill(&tasklet);
        printk(KERN_ALERT,"test_exit is running./n");
}
MODULE_LICENSE("GPL");
 
module_init(test_init);
module_exit(test_exit);

从这个例子可以看出,所谓的小任务机制是为下半部函数的执行提供了一种执行机制,也就是说,推迟处理的事情是由tasklet_handler实现,何时执行,经由小任务机制封装后交给内核去处理。

四、中断处理的工作队列机制

工作队列(work queue)是另外一种将工作推后执行的形式,它和前面讨论的tasklet有所不同。工作队列可以把工作推后,交由一个内核线程去执行,也就是说,这个下半部分可以在进程上下文中执行。这样,通过工作队列执行的代码能占尽进程上下文的所有优势。最重要的就是工作队列允许被重新调度甚至是睡眠。
那么,什么情况下使用工作队列,什么情况下使用tasklet。如果推后执行的任务需要睡眠,那么就选择工作队列;如果推后执行的任务不需要睡眠,那么就选择tasklet。另外,如果需要用一个可以重新调度的实体来执行你的下半部处理,也应该使用工作队列。它是唯一能在进程上下文运行的下半部实现的机制,也只有它才可以睡眠。这意味着在需要获得大量的内存时、在需要获取信号量时,在需要执行阻塞式的I/O操作时,它都会非常有用。如果不需要用一个内核线程来推后执行工作,那么就考虑使用tasklet。

4.1 工作、工作队列

如前所述,我们把推后执行的任务叫做工作(work),描述它的数据结构为work_struct,这些工作以队列结构组织成工作队列(workqueue),其数据结构为workqueue_struct,而工作线程就是负责执行工作队列中的工作。系统默认的工作者线程为events,自己也可以创建自己的工作者线程。表示工作的数据结构用<linux/workqueue.h>中定义的work_struct结构表示:

struct work_struct {
	atomic_long_t data;
	struct list_head entry;	// 连接所有工作的链表
	work_func_t func; 		// 要执行的函数
#ifdef CONFIG_LOCKDEP
	struct lockdep_map lockdep_map;
#endif
};
typedef void (*work_func_t)(struct work_struct *work);

这些结构被连接成链表。当一个工作者线程被唤醒时,它会执行它的链表上的所有工作。工作被执行完毕,它就将相应的work_struct对象从链表上移去。当链表上不再有对象的时候,它就会继续休眠。表示工作队列的数据结构用<kernel/workqueue.c>中定义的workqueue_struct:

/*
 * The externally visible workqueue.  It relays the issued work items to
 * the appropriate worker_pool through its pool_workqueues.
 */
struct workqueue_struct {
	struct list_head	pwqs;		/* WR: all pwqs of this wq */
	struct list_head	list;		/* PR: list of all workqueues */

	struct mutex		mutex;		/* protects this wq */
	int			work_color;	/* WQ: current work color */
	int			flush_color;	/* WQ: current flush color */
	atomic_t		nr_pwqs_to_flush; /* flush in progress */
	struct wq_flusher	*first_flusher;	/* WQ: first flusher */
	struct list_head	flusher_queue;	/* WQ: flush waiters */
	struct list_head	flusher_overflow; /* WQ: flush overflow list */

	struct list_head	maydays;	/* MD: pwqs requesting rescue */
	struct worker		*rescuer;	/* I: rescue worker */

	int			nr_drainers;	/* WQ: drain in progress */
	int			saved_max_active; /* WQ: saved pwq max_active */

	struct workqueue_attrs	*unbound_attrs;	/* PW: only for unbound wqs */
	struct pool_workqueue	*dfl_pwq;	/* PW: only for unbound wqs */

#ifdef CONFIG_SYSFS
	struct wq_device	*wq_dev;	/* I: for sysfs interface */
#endif
#ifdef CONFIG_LOCKDEP
	char			*lock_name;
	struct lock_class_key	key;
	struct lockdep_map	lockdep_map;
#endif
	char			name[WQ_NAME_LEN]; /* I: workqueue name */

	/*
	 * Destruction of workqueue_struct is RCU protected to allow walking
	 * the workqueues list without grabbing wq_pool_mutex.
	 * This is used to dump all workqueues from sysrq.
	 */
	struct rcu_head		rcu;

	/* hot fields used during command issue, aligned to cacheline */
	unsigned int		flags ____cacheline_aligned; /* WQ: WQ_* flags */
	struct pool_workqueue __percpu *cpu_pwqs; /* I: per-cpu pwqs */
	struct pool_workqueue __rcu *numa_pwq_tbl[]; /* PWR: unbound pwqs indexed by node */
};

4.2 创建推后的工作

4.2.1 静态地创建工作(work_struct)

要使用工作队列,首先要做的是创建一些需要推后完成的工作。可以通过DECLARE_WORK在编译时静态地建该结构:

DECLARE_WORK(name, func); 或者INIT_WORK(_work, _func)

其定义如下:

#define DECLARE_WORK(n, f)					\
	struct work_struct n = __WORK_INITIALIZER(n, f)

举例如下:

static void do_poweroff(struct work_struct *dummy)
{
	kernel_power_off();
}
 
static DECLARE_WORK(poweroff_work, do_poweroff);

即创建了一个全局静态变量:static work_struct poweroff_work,且被初始化了,其执行函数为do_poweroff。

4.2.2 动态初始化工作(work_struct)

先定义一具struct work_struct 变量,在需要使用时调用INIT_WORK进行初始化,然后便可以使用。

#define INIT_WORK(_work, _func)					\
	do {							\
		__INIT_WORK((_work), (_func), 0);		\
	} while (0)

举例:

void __cfg80211_scan_done(struct work_struct *wk)
{
	struct cfg80211_registered_device *rdev;
 
	rdev = container_of(wk, struct cfg80211_registered_device,
			    scan_done_wk);
 
	cfg80211_lock_rdev(rdev);
	___cfg80211_scan_done(rdev, false);
	cfg80211_unlock_rdev(rdev);
}
 
struct cfg80211_registered_device {
 
	struct work_struct scan_done_wk;
	struct work_struct sched_scan_results_wk;
	struct work_struct conn_work;
	struct work_struct event_work;
	struct cfg80211_wowlan *wowlan;
}
struct cfg80211_registered_device *rdev;
rdev = kzalloc(alloc_size, GFP_KERNEL);
 
INIT_WORK(&rdev->scan_done_wk, __cfg80211_scan_done);  // 其执行函数为: __cfg80211_scan_done
INIT_WORK(&rdev->sched_scan_results_wk, __cfg80211_sched_scan_results);

4.3 对工作进行调度

现在工作已经被创建,我们可以调度它了。想要把给定工作的待处理函数提交给缺省的events工作线程,只需调用: int schedule_work(struct work_struct *work);
它把work放入全局工作队列:system_wq,其定义如下:

/**
 * schedule_work - put work task in global workqueue
 * @work: job to be done
 *
 * Returns %false if @work was already on the kernel-global workqueue and
 * %true otherwise.
 *
 * This puts a job in the kernel-global workqueue if it was not already
 * queued and leaves it in the same position on the kernel-global
 * workqueue otherwise.
 */
static inline bool schedule_work(struct work_struct *work)
{
	return queue_work(system_wq, work);
}
/* System-wide workqueues which are always present.
 *
 * system_wq is the one used by schedule[_delayed]_work[_on]().
 * Multi-CPU multi-threaded.  There are users which expect relatively
 * short queue flush time.  Don't queue works which can run for too
 * long.
 */
extern struct workqueue_struct *system_wq;

queue_work:把一个工作放入工作队列:

/**
 * queue_work - queue work on a workqueue
 * @wq: workqueue to use
 * @work: work to queue
 *
 * Returns %false if @work was already on a queue, %true otherwise.
 *
 * We queue the work to the CPU on which it was submitted, but if the CPU dies
 * it can be processed by another CPU.
 */
static inline bool queue_work(struct workqueue_struct *wq,
			      struct work_struct *work)
{
	return queue_work_on(WORK_CPU_UNBOUND, wq, work);
}

/**
 * queue_work_on - queue work on specific cpu
 * @cpu: CPU number to execute work on
 * @wq: workqueue to use
 * @work: work to queue
 *
 * We queue the work to a specific CPU, the caller must ensure it
 * can't go away.
 *
 * Return: %false if @work was already on a queue, %true otherwise.
 */
bool queue_work_on(int cpu, struct workqueue_struct *wq,
		   struct work_struct *work)
{
	bool ret = false;
	unsigned long flags;

	local_irq_save(flags);

	if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
		__queue_work(cpu, wq, work);
		ret = true;
	}

	local_irq_restore(flags);
	return ret;
}

把work放入工作队列,work马上就会被调度,一旦其所在的处理器上的工作者线程被唤醒,它就会被执行。有时候并不希望工作马上就被执行,而是希望它经过一段延迟以后再执行。在这种情况下,可以调度它在指定的时间执行:

/**
 * schedule_delayed_work - put work task in global workqueue after delay
 * @dwork: job to be done
 * @delay: number of jiffies to wait or 0 for immediate execution
 *
 * After waiting for a given time this puts a job in the kernel-global
 * workqueue.
 */
static inline bool schedule_delayed_work(struct delayed_work *dwork,
					 unsigned long delay)
{
	return queue_delayed_work(system_wq, dwork, delay);
}

#define DECLARE_DELAYED_WORK(n, f)					\
	struct delayed_work n = __DELAYED_WORK_INITIALIZER(n, f, 0)

#define INIT_DELAYED_WORK(_work, _func)					\
	__INIT_DELAYED_WORK(_work, _func, 0)

4.4 创建工作者线程

工作放入工作队列之后,由管理此工作队列的工作者来执行这些work,通过alloc_workqueue或create_singlethread_workqueue来创建工作者线程,它最后调用kthread_create创建线程,其线程名为alloc_workqueue中指定的name,其举例如下:

/**
 * workqueue_init_early - early init for workqueue subsystem
 *
 * This is the first half of two-staged workqueue subsystem initialization
 * and invoked as soon as the bare basics - memory allocation, cpumasks and
 * idr are up.  It sets up all the data structures and system workqueues
 * and allows early boot code to create workqueues and queue/cancel work
 * items.  Actual work item execution starts only after kthreads can be
 * created and scheduled right before early initcalls.
 */
int __init workqueue_init_early(void)
{
	int std_nice[NR_STD_WORKER_POOLS] = { 0, HIGHPRI_NICE_LEVEL };
	int hk_flags = HK_FLAG_DOMAIN | HK_FLAG_WQ;
	int i, cpu;

	WARN_ON(__alignof__(struct pool_workqueue) < __alignof__(long long));

	BUG_ON(!alloc_cpumask_var(&wq_unbound_cpumask, GFP_KERNEL));
	cpumask_copy(wq_unbound_cpumask, housekeeping_cpumask(hk_flags));

	pwq_cache = KMEM_CACHE(pool_workqueue, SLAB_PANIC);

	/* initialize CPU pools */
	for_each_possible_cpu(cpu) {
		struct worker_pool *pool;

		i = 0;
		for_each_cpu_worker_pool(pool, cpu) {
			BUG_ON(init_worker_pool(pool));
			pool->cpu = cpu;
			cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
			pool->attrs->nice = std_nice[i++];
			pool->node = cpu_to_node(cpu);

			/* alloc pool ID */
			mutex_lock(&wq_pool_mutex);
			BUG_ON(worker_pool_assign_id(pool));
			mutex_unlock(&wq_pool_mutex);
		}
	}

	/* create default unbound and ordered wq attrs */
	for (i = 0; i < NR_STD_WORKER_POOLS; i++) {
		struct workqueue_attrs *attrs;

		BUG_ON(!(attrs = alloc_workqueue_attrs()));
		attrs->nice = std_nice[i];
		unbound_std_wq_attrs[i] = attrs;

		/*
		 * An ordered wq should have only one pwq as ordering is
		 * guaranteed by max_active which is enforced by pwqs.
		 * Turn off NUMA so that dfl_pwq is used for all nodes.
		 */
		BUG_ON(!(attrs = alloc_workqueue_attrs()));
		attrs->nice = std_nice[i];
		attrs->no_numa = true;
		ordered_wq_attrs[i] = attrs;
	}

	system_wq = alloc_workqueue("events", 0, 0);
	system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0);
	system_long_wq = alloc_workqueue("events_long", 0, 0);
	system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,
					    WQ_UNBOUND_MAX_ACTIVE);
	system_freezable_wq = alloc_workqueue("events_freezable",
					      WQ_FREEZABLE, 0);
	system_power_efficient_wq = alloc_workqueue("events_power_efficient",
					      WQ_POWER_EFFICIENT, 0);
	system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_power_efficient",
					      WQ_FREEZABLE | WQ_POWER_EFFICIENT,
					      0);
	BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq ||
	       !system_unbound_wq || !system_freezable_wq ||
	       !system_power_efficient_wq ||
	       !system_freezable_power_efficient_wq);

	return 0;
}


/**
 * workqueue_init - bring workqueue subsystem fully online
 *
 * This is the latter half of two-staged workqueue subsystem initialization
 * and invoked as soon as kthreads can be created and scheduled.
 * Workqueues have been created and work items queued on them, but there
 * are no kworkers executing the work items yet.  Populate the worker pools
 * with the initial workers and enable future kworker creations.
 */
int __init workqueue_init(void)
{
	struct workqueue_struct *wq;
	struct worker_pool *pool;
	int cpu, bkt;

	/*
	 * It'd be simpler to initialize NUMA in workqueue_init_early() but
	 * CPU to node mapping may not be available that early on some
	 * archs such as power and arm64.  As per-cpu pools created
	 * previously could be missing node hint and unbound pools NUMA
	 * affinity, fix them up.
	 *
	 * Also, while iterating workqueues, create rescuers if requested.
	 */
	wq_numa_init();

	mutex_lock(&wq_pool_mutex);

	for_each_possible_cpu(cpu) {
		for_each_cpu_worker_pool(pool, cpu) {
			pool->node = cpu_to_node(cpu);
		}
	}

	list_for_each_entry(wq, &workqueues, list) {
		wq_update_unbound_numa(wq, smp_processor_id(), true);
		WARN(init_rescuer(wq),
		     "workqueue: failed to create early rescuer for %s",
		     wq->name);
	}

	mutex_unlock(&wq_pool_mutex);

	/* create the initial workers */
	for_each_online_cpu(cpu) {
		for_each_cpu_worker_pool(pool, cpu) {
			pool->flags &= ~POOL_DISASSOCIATED;
			BUG_ON(!create_worker(pool));
		}
	}

	hash_for_each(unbound_pool_hash, bkt, pool, hash_node)
		BUG_ON(!create_worker(pool));

	wq_online = true;
	wq_watchdog_init();

	return 0;
}

如:cfg80211_wq = create_singlethread_workqueue(“cfg80211”);创建了一个名为cfg80211的kernel线程。

4.5 工作队列的简单应用

#include <linux/module.h> 
#include <linux/init.h> 
#include <linux/workqueue.h> 
static struct workqueue_struct *queue = NULL; 
static struct work_struct work; 
static void work_handler(struct work_struct *data) 
{
  	printk(KERN_ALERT "work handler function.\n"); 
}

static int __init test_init(void) 
{
	queue = create_singlethread_workqueue("helloworld"); 
	/*创建一个单线程的工作队列*/         
	if (!queue)                 
		goto err;         
	INIT_WORK(&work, work_handler);         
	schedule_work(&work);         
	return 0; 
	err:         
	return -1; 
} 
static void __exit test_exit(void) {
    destroy_workqueue(queue); 
} 
    
MODULE_LICENSE("GPL"); 
module_init(test_init); 
module_exit(test_exit);

参考

[1] https://blog.csdn.net/myarrow/article/details/9287169

[2] https://blog.csdn.net/yhb1047818384/article/details/63687126

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Linux中断处理的“下半部”机制 的相关文章

  • 我不明白 execlp() 在 Linux 中如何工作

    过去两天我一直在试图理解execlp 系统调用 但我还在这里 让我直奔主题 The man pageexeclp 将系统调用声明为int execlp const char file const char arg 与描述 execl exe
  • waitpid() 的作用是什么?

    有什么用waitpid 它通常用于等待特定进程完成 或者如果您使用特殊标志则更改状态 基于其进程 ID 也称为pid 它还可用于等待一组子进程中的任何一个 无论是来自特定进程组的子进程还是当前进程的任何子进程 See here http l
  • Linux:如何设置进程的时区?

    我需要设置在 Linux 机器上启动的各个进程的时区 我尝试设置TZ变量 在本地上下文中 但它不起作用 有没有一种方法可以使用与系统日期不同的系统日期从命令行运行应用程序 这可能听起来很愚蠢 但我需要一种sandbox系统日期将被更改的地方
  • 在两次之间每分钟执行一次 Cronjob

    我需要在 crontab 中每分钟运行一个 bash 脚本8 45am and 9 50am每天的 Code 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 8 home pull sh gt ho
  • 使用 Grep 查找两个短语之间的文本块(包括短语)

    是否可以使用 grep 来高亮所有以以下内容开头的文本 mutablePath CGPathCreateMutable 并以以下内容结尾 CGPathAddPath skinMutablePath NULL mutablePath 这两个短
  • 确定我可以向文件句柄写入多少内容;将数据从一个 FH 复制到另一个 FH

    如何确定是否可以将给定数量的字节写入文件句柄 实际上是套接字 或者 如何 取消读取 我从其他文件句柄读取的数据 我想要类似的东西 n how much can I write w handle n read r handle buf n a
  • Fortran gfortran linux 中的“分段错误(核心转储)”错误

    我正在创建一个程序 该程序将分析目录中的文件 fits 然后它将在另一个目录中创建另一个文件 txt 它只是一个转换器 当我尝试执行该程序 编译正常 时 它给了我一条错误消息 程序收到信号 SIGSEGV 分段错误 无效的内存引用 此错误的
  • NUMA 在虚拟内存中是如何表示的?

    有许多资源 https en wikipedia org wiki Non uniform memory access从硬件角度描述NUMA的架构性能影响 http practical tech com infrastructure num
  • 监控子进程的内存使用情况

    我有一个 Linux 守护进程 它分叉几个子进程并监视它们是否崩溃 根据需要重新启动 如果父进程可以监视子进程的内存使用情况 以检测内存泄漏并在超出一定大小时重新启动子进程 那就太好了 我怎样才能做到这一点 您应该能够从 proc PID
  • 在 C++ linux 中将 STRINGS 写入串口

    我知道这个问题遍布互联网 但仍然没有任何东西能让我完全解决这个问题 我想用 C linux 将数据写入 Propeller 板的串行端口 从控制台获取输入时程序运行良好 但是当我向它写入字符串时总是返回 ERROR Invalid comm
  • 捕获实时流量时如何开启纳秒精度?

    如何告诉 libpcap v1 6 2 将纳秒值存储在struct pcap pkthdr ts tv usec 而不是微秒值 捕获实时数据包时 Note This question is similar to How to enable
  • 为什么我可以在 /proc/pid/maps 输出中看到几个相同的段?

    测试在32位Linux上进行 代码如下 int foo int a int b int c a b return c int main int e 0 int d foo 1 2 printf d n d scanf d e return
  • 警告:请求的映像平台 (linux/amd64) 与检测到的主机平台 (linux/arm64/v8) 不匹配

    警告 请求的映像平台 linux amd64 与检测到的主机平台 linux arm64 v8 不匹配 并且未请求特定平台 docker 来自守护程序的错误响应 无法选择具有功能的设备驱动程序 gpu 我在 mac 上尝试运行此命令时遇到此
  • 每个命令都返回“bash:<命令>:找不到命令...”[关闭]

    Closed 这个问题是无法重现或由拼写错误引起 help closed questions 目前不接受答案 我刚刚安装了 Scala 并添加了路径gedit bashrc export SCALA HOME home avijit sca
  • 让 TeXstudio 在 linux mint 中工作:找不到文件“url.sty”。

    刚刚切换到 Linux Mint 以前的顽固 Windows 用户 我在尝试安装 TeXstudio 时遇到一些问题 Sudo apt get install texstudio 给了我一个正确的安装 至少 我是这么认为的 但是当我尝试构建
  • 嵌入式linux编写AT命令

    我在向 GSM 模块写入 AT 命令时遇到问题 当我使用 minicom b 115200 D dev ttySP0 term vt100 时它工作完美 但我不知道如何在 C 代码中做同样的事情 我没有收到任何错误 但模块对命令没有反应 有
  • 使用 posix_spawn 启动进程

    我正在使用以下代码在 Linux 中启动新进程 pid t processID char argV 192 168 1 40 char 0 int status 1 status posix spawn processID home use
  • Linux中使用管道进行进程间通信

    我已经编写了在 linux 中写入数字以进行管道传输的代码 如下所示 但显示错误 任何人都可以帮助我解决这个问题 基本上该程序的问题陈述如下 一个程序将打开一个管道 向管道写入一个数字 其他程序将打开同一管道 读取数字并打印它们 关闭两个管
  • 配置:错误:无法运行C编译的程序

    我正在尝试使用 Debian Wheezy 操作系统在我的 Raspberry Pi 上安装不同的软件 当我运行尝试配置软件时 我尝试安装我得到此输出 checking for C compiler default output file
  • Linux“屏幕”的 Windows 等效项还是其他替代方案?

    我正在寻找一种在 Windows 环境中控制程序的方法 我希望它与 Linux 软件有点相似 screen 我搜索的原因是我需要使用标识符启动一个程序 在 Windows 上 这样我以后就能够关闭该特定程序 而无需关闭其他任何程序 即使实际

随机推荐

  • 给定两个字符串str1和str2,查找str2在str1中出现的位置

    给定string str1和string str2 xff0c 编写一个库函数 xff0c 返回str2在str1中的位置 如 xff1a str1为 34 ABCDLANCEXYZ 34 xff0c str2为 34 LANCE 34 x
  • 中国交通标志检测数据集

    版权声明 xff1a 本文为转发问 xff0c 原文见博客 https blog csdn net dong ma article details 84339007 中国交通标志检测数据集 xff08 CCTSDB xff09 来源于 A
  • CentOS 修改ls目录的颜色

    修改ls目录的颜色 linux系统默认目录颜色是蓝色的 xff0c 在黑背景下看不清楚 xff0c 可以通过以下2种方法修改ls查看的颜色 bash profile文件在用的根目录下 xff0c ls al可以看到 方法一 xff1a 1
  • Tiny210(S5PV210) U-BOOT(六)----DDR内存配置

    上次讲完了Nand Flash的低级初始化 xff0c 然后Nand Flash的操作主要是在board init f nand xff0c 中 xff0c 涉及到将代码从Nand Flash中copy到DDR中 xff0c 这个放到后面实
  • NAND FLASH命名规则

    基于网络的一个修订版 三星的pure nandflash xff08 就是不带其他模块只是nandflash存储芯片 xff09 的命名规则如下 xff1a 1 Memory K 2 NANDFlash 9 3 Small Classifi
  • s3c6410 DMA

    S3C6410中DMA操作步骤 xff1a 1 决定使用安全DMAC SDMAC 还是通用DMAC DMAC xff1b 2 开始相应DMAC的系统时钟 xff0c 并关闭另外一组的时钟 xff08 系统默认开启SDMA时钟 xff09 x
  • Visual Studio和VS Code的区别

    1 Visual Studio简介 xff1a 是一个集成开发环境 IDE xff0c 安装完成后就能直接用 xff0c 编译工具 xff0c 调试工具 xff0c 各个语言的开发工具 xff0c 都是已经配置好的 xff0c 开箱即用 适
  • 博客转移

    由于CSDN 文章不间断的会丢失图片 xff0c 然后逼格也不够高 xff0c 导致几年都没有写博客 xff0c 全部是记录至印象笔记中 xff0c 但是久了也不太好 xff0c 所以最近搞了一个自己的个人博客 xff0c 以后文章全部写至
  • 安装qt-everywhere-opensource-src-4.8.6

    1 下载 qt everywhere opensource src 4 8 6 http mirrors hust edu cn qtproject official releases qt 4 8 4 8 6 qt everywhere
  • CentOS6.5上安装qt-creator-opensource-linux-x86-3.1.2.run

    1 qt creator opensource linux x86 3 1 2 run的下载 wget http mirrors hustunique com qt official releases qtcreator 3 1 3 1 2
  • atoi()函数

    atoi 函数 原型 xff1a int atoi xff08 const char nptr xff09 用法 xff1a include lt stdlib h gt 功能 xff1a 将字符串转换成整型数 xff1b atoi 会扫描
  • ubuntu中printk打印信息

    1 设置vmware添加seria port 使用文件作为串口 2 启动ubuntu xff0c 修改 etc default grub GRUB CMDLINE LINUX DEFAULT 61 34 34 GRUB CMDLINE LI
  • 静态库、共享库、动态库概念?

    通常库分为 xff1a 静态库 共享库 xff0c 动态加载库 下面分别介绍 一 静态库 xff1a 1 概念 xff1a 静态库就是一些目标文件的集合 xff0c 以 a结尾 静态库在程序链接的时候使用 xff0c 链接器会将程序中使用
  • 链表——怎么写出正确的链表?

    链表 相比数组 xff0c 链表不需要一块连续的内存空间 xff0c 而是通过指针将一组零散的内存块串联起来使用 xff0c 而这里的内存块就叫做节点 xff0c 一般节点除了保存data还会保存下一个节点的地址也就是指针 单链表 头节点
  • 【STM32】STM32 变量存储在片内FLASH的指定位置

    在这里以STM32L4R5为例 xff08 官方出的DEMO板 xff09 xff0c 将变量存储在指定的片内FLASH地址 xff08 0x081F8000 xff09 一 MDK Keil软件操作 uint8 t version spa
  • 【STM32】 利用paho MQTT&WIFI 连接阿里云

    ST联合阿里云推出了云接入的相关培训 xff08 基于STM32的端到端物联网全栈开发 xff09 xff0c 所采用的的板卡为NUCLEO L4R5ZI板 xff0c 实现的主要功能为采集温湿度传感器上传到阿里云物联网平台 xff0c 并
  • 【STM32 】通过ST-LINK utility 实现片外FLASH的烧写

    目录 前言 一 例程参考及讲解 1 1 Loader Src c文件 1 2 Dev Inf c文件 二 程序修改 三 实测 参考 前言 在单片机的实际应用中 xff0c 通常会搭载一些片外FLASH芯片 xff0c 用于存储系统的一些配置
  • 基于FFmpeg的推流器(UDP推流)

    一 推流器对于输入输出文件无要求 1 输入文件可为网络流URL xff0c 可以实现转流器 2 将输入的文件改为回调函数 xff08 内存读取 xff09 的形式 xff0c 可以推送内存中的视频数据 3 将输入文件改为系统设备 xff08
  • 十进制转十六进制的C语言实现

    include lt stdio h gt include lt stdlib h gt include lt string h gt void reversestr char source char target unsigned int
  • Linux中断处理的“下半部”机制

    前言 中断分为硬件中断 xff0c 软件中断 中断的处理原则主要有两个 xff1a 一个是不能嵌套 xff0c 另外一个是越快越好 在Linux中 xff0c 分为中断处理采用 上半部 和 下半部 处理机制 一 中断处理 下半部 机制 中断