`
berdy
  • 浏览: 509030 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

Netty 4 源码分析——EventExecutor

阅读更多
先从EventExecutor开始,因为它是一个很基础的工具类,是对I/O线程的包装。先了解下它的源码会对后面的分析有更好的理解。



先看下EventExecutor的类关系图,这里只是简单的画出了类和接口的继承和实现关系,还有其他的聚合关系没有画出来,为的是便于分析思路的清晰。

说到Executor,很容易联想到jdk中 java.util.concurrent.Executor 接口,这个接口非常简单,就一个方法
void execute(Runnable command);

从方法签名上就能看出这个是为了支持异步模式的。command表示一个命令。当前线程就是命令者角色,Executor内部的去运行Runnable的线程就是执行者。这里没有提供明确的地方供命令者去取得命令的执行结果。

ExecutorService 继承了Executor 接口,增加了对自身生命周期管理的方法,同时提供了一个Future给命令者去获取命令的执行结果。

ScheduledExecutorService 继承了ExecutorService接口,增加了对定时任务的支持。

EventExecutorGroup 继承了ScheduledExecutorService接口,对原来的ExecutorService的关闭接口提供了增强,提供了优雅的关闭接口。从接口名称上可以看出它是对多个EventExecutor的集合,提供了对多个EventExecutor的迭代访问接口。

EventExecutor 继承EventExecutorGroup 看着这个关系真心有些纠结啊。不过细想下还是能理解的。A是B中的一员,但是A也能迭代访问B中的其他成员。这个继承关系支持了迭代访问这个行为。自然的他提供了一个parent接口,来获取所属的EventExecutorGroup 。另外提供了inEventLoop 方法支持查询某个线程是否在EventExecutor所管理的线程中。还有其他一些创建Promise和Future的方法。

AbstractEventExecutor 只是对EventExecutor中某些方法的简单实现

下面重点分析下非常有意思SingleThreadEventExecutor,它也是个抽象类,但是提供了很多重要方法的实现。弄清楚了这个对整个EventExecutor体系都非常有帮助。从类名上可知里面只有一个线程,先搞清楚一个线程的处理过程再理解多线程的就轻松些了。

先从execute方法入口分析
@Override
public void execute(Runnable task) {
	if (task == null) {
		throw new NullPointerException("task");
	}

	boolean inEventLoop = inEventLoop();
	if (inEventLoop) {
		addTask(task);
	} else {
		startThread();
		addTask(task);
		if (isShutdown() && removeTask(task)) {
			reject();
		}
	}

	if (!addTaskWakesUp) {
		wakeup(inEventLoop);
	}
}

先不看这个源码,我们分析下,作为一个Executor,A让你执行命令A,B让你执行命令B。。。命令不是说执行就能执行的吧。总得有个地方保存还没来得及执行的命令吧,总得有个先来后到吧。而这个地方又会被多线程访问,得保证多线程访问可见性,操作的原子性。jdk中提供的BlockingQueue就是为此而生的。BlockingQueue是一个接口,jdk中有很多他的实现。SingleThreadEventExcutor中提供的这个地方叫tasksQueue,类型是jdk中的LinkedBlockingQueue。上面代码中的addTask就是往tasksQueue中添加。

另外SingleThreadEventExcutor实现了ScheduledExecutorService 接口,支持执行定时任务。得有个地方存放定时任务信息。类中的实现是delayedTaskQueue,它是一个PriorityQueue ,也是一个BlockingQueue。不过它里面的元素不是按照先来后到的顺序存取的,而是按照各个元素的优先级判断的。

因为这个execute方法,是可以在外部线程调用,也可以在内部线程调用。也就是说外部成员可以给你下命令,内部成员也可以给你下命令。所以在上面的代码中先调用inEventLoop判断当前下命令的是外部的还是内部的。
如果是外部的,先确定内部线程是否启动,没启动就先启动内部线程同时给自己加一个定时清理的定时任务。这个从下面的代码中可以看出
private void startThread() {
	synchronized (stateLock) {
		if (state == ST_NOT_STARTED) {
			state = ST_STARTED;
			delayedTaskQueue.add(new ScheduledFutureTask<Void>(
					this, delayedTaskQueue, Executors.<Void>callable(new PurgeTask(), null),
					ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
			thread.start();
		}
	}
}

private final class PurgeTask implements Runnable {
	@Override
	public void run() {
		Iterator<ScheduledFutureTask<?>> i = delayedTaskQueue.iterator();
		while (i.hasNext()) {
			ScheduledFutureTask<?> task = i.next();
			if (task.isCancelled()) {
				i.remove();
			}
		}
	}
}

添加命令完成了,下面就看如何去执行命令了,这个就需要分析下内部线程的执行逻辑了。SingleThreadEventExecutor类中有一个实力变量Thread,它引用的就是当前Executor所拥有的那个thread对象。
thread = threadFactory.newThread(new Runnable() {
	@Override
	public void run() {
		boolean success = false;
		updateLastExecutionTime();
		try {
			SingleThreadEventExecutor.this.run();
			success = true;
		} catch (Throwable t) {
			logger.warn("Unexpected exception from an event executor: ", t);
		} finally {
			// 更改状态
			if (state < ST_SHUTTING_DOWN) {
				state = ST_SHUTTING_DOWN;
			}

			// Check if confirmShutdown() was called at the end of the loop.
			// 这里说明在try块中调用的SingleThreadEventExecutor.this.run();中在方法结束之前必须调用confirmShutdown方法,这个在其之类实现的run方法中得到验证
			if (success && gracefulShutdownStartTime == 0) {
				logger.error(
						"Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
						SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
						"before run() implementation terminates.");
			}

			try {
				// Run all remaining tasks and shutdown hooks.
				// 确保tasksQueue和shutDownHooks中的runable都处理完成了,这里的处理完成有可能是超时了
				for (;;) {
					if (confirmShutdown()) {
						break;
					}
				}
			} finally {
				try {
					cleanup();
				} finally {
					synchronized (stateLock) {
						state = ST_TERMINATED;
					}
					//释放信号量,使用Semaphore(0)来让另一个线程一直等待,知道内部线程调用了release()
					threadLock.release();
					if (!taskQueue.isEmpty()) {
							logger.warn(
									"An event executor terminated with " +
									"non-empty task queue (" + taskQueue.size() + ')');
						}

						terminationFuture.setSuccess(null);
					}
				}
			}
		}
	});

上面Thread内部run方法执行的是SingleThreadEventExecutor.this.run(),而这个run方法是一个抽象方法,留给了子类去实现了。不过可以肯定的是子类的run方法是不断的去tasksQueue中取出task去执行。现在重点分析下finally块中的代码。
1、首先更改状态为正在关闭状态。
2、如果子类中的run方法中的loop执行成功了,就得先调用confirmShutdown,确认任务队列中的任务是否都已经被执行了。
3、然后还得再次确认下任务队列中是否已被执行完毕,因为在关闭的过程中外部也是能添加任务的。
4、最终执行清理工作,更改状态为已关闭,释放信号量。
5、如果这个时候还是有任务没执行完,那也只能是无奈了,记个log吧
6、更新整个关闭过程为success

再分析下confirmShutdown,看看是如何保证所有的task执行完成的呢
protected boolean confirmShutdown() {
	// 如果state状态 state < ST_SHUTTING_DOWN则直接return false
	if (!isShuttingDown()) {
		return false;
	}
	// 这个方法必须从内部调用,从修饰符 protected也可以看出
	if (!inEventLoop()) {
		throw new IllegalStateException("must be invoked from an event loop");
	}
	// 取消所有的定时任务
	cancelDelayedTasks();
	
	if (gracefulShutdownStartTime == 0) {
		// 标记shutdown处理的开始时间
		gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
	}
	// 运行tasksQueue或者shutdownHooks中的所有Runnable都处理完成
	if (runAllTasks() || runShutdownHooks()) {
		//分析了下源码,isShutdown()这个只能是在外部线程调用了shutdown()接口的时候才会有可能成为true
		//但是现在这个方法已经@Deprecated,所以这个if块是不会进入的
		if (isShutdown()) {
			// shutdown 成功,没有更多的runnable需要执行
			return true;
		}

		// There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period.
		wakeup(true);
		return false;
	}

	final long nanoTime = ScheduledFutureTask.nanoTime();
	// runAllTasks() 或者runAllTasks() + runShutdownHooks()方法执行时间操作了最大限制
	if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
		return true;
	}
	// 现在时间与上个任务执行完成的时间差小于quietPeriod时间,继续检测
	if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
		// Check if any tasks were added to the queue every 100ms.
		// TODO: Change the behavior of takeTask() so that it returns on timeout.
		wakeup(true);
		try {
			//内部线程sleep 100ms
			Thread.sleep(100);
		} catch (InterruptedException e) {
			// Ignore
		}

		return false;
	}

	// No tasks were added for last quiet period - hopefully safe to shut down.
	// (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.)
	return true;
}

后面再看看这个类中的一些内部变量
private static final Runnable WAKEUP_TASK = new Runnable() {
	@Override
	public void run() {
		// Do nothing.
	}
};

这个WAKEUP_TASK什么也不做,为啥取名wakeup呢?我看源码也没太明白。有人理解的给解释下吧
private final Semaphore threadLock = new Semaphore(0);

threadLock的内部permits设置为0,也就是说acquire()永远获取不到permit,会一直被阻塞着。那有什么用呢?另一种实现wait()/notify()吧

值得注意的是类中是如何来控制定时任务的呢?秘密在这个方法中
private void fetchFromDelayedQueue() {
	long nanoTime = 0L;
	for (;;) {
		ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
		if (delayedTask == null) {
			break;
		}

		if (nanoTime == 0L) {
			nanoTime = ScheduledFutureTask.nanoTime();
		}

		if (delayedTask.deadlineNanos() <= nanoTime) {
			delayedTaskQueue.remove();
			taskQueue.add(delayedTask);
		} else {
			break;
		}
	}
}

ScheduledFutureTask类中有个变量记录这个类被加载进内存中的时间
private static final long START_TIME = System.nanoTime();

static long nanoTime() {
	return System.nanoTime() - START_TIME;
}
// 返回到期时间,到期时间在构造函数中指定了
public long deadlineNanos() {
	return deadlineNanos;
}
// 这个方法决定了排序的优先级
@Override
public int compareTo(Delayed o) {
	if (this == o) {
		return 0;
	}

	ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
	long d = deadlineNanos() - that.deadlineNanos();
	if (d < 0) {
		return -1;
	} else if (d > 0) {
		return 1;
	} else if (id < that.id) {
		return -1;
	} else if (id == that.id) {
		throw new Error();
	} else {
		return 1;
	}
}


所以fetchFromDelayedQueue()方法的逻辑就是先取出即将到期的task,判断是否已经到期,若已经到期就加入到tasksQueue中,等到被执行。

先分析到这里,后面在补上。
有什么不正确的,也请大家指正。
  • 大小: 23.7 KB
1
0
分享到:
评论
1 楼 fatherican 2016-08-22  
写的好。

相关推荐

Global site tag (gtag.js) - Google Analytics