Java-基础-线程中断

Java-基础-线程中断

1. 概述

Thread提供了interrupt方法,中断线程的执行:

  • 如果线程堵塞在object.wait、Thread.join和Thread.sleep,将会抛出InterruptedException,同时清除线程的中断状态;
  • 如果线程堵塞在java.nio.channels.InterruptibleChannel的IO上,Channel将会被关闭,线程被置为中断状态,并抛出java.nio.channels.ClosedByInterruptException;
  • 如果线程堵塞在java.nio.channels.Selector上,线程被置为中断状态,select方法会马上返回,类似调用wakeup的效果;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 核心 interrupt 方法
public void interrupt() {
if (this != Thread.currentThread()) // 非本线程,需要检查权限
checkAccess();

synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // 仅仅设置interrupt标志位
b.interrupt(this); // 调用如 I/O 操作定义的中断方法
return;
}
}
interrupt0();
}
// 静态方法,这个方法有点坑,调用该方法调用后会清除中断状态。
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}
// 这个方法不会清除中断状态
public boolean isInterrupted() {
return isInterrupted(false);
}
// 上面两个方法会调用这个本地方法,参数代表是否清除中断状态
private native boolean isInterrupted(boolean ClearInterrupted);

2. 源码实现

  • 之前在分析Thread.start的时候曾经提到,JavaThread有三个成员变量:
1
2
3
4
5
6
7
//用于synchronized同步块和Object.wait() 
ParkEvent * _ParkEvent ;
//用于Thread.sleep()
ParkEvent * _SleepEvent ;
//用于unsafe.park()/unpark(),供java.util.concurrent.locks.LockSupport调用,
//因此它支持了java.util.concurrent的各种锁、条件变量等线程同步操作,是concurrent的实现基础
Parker* _parker;
  • 初步猜测interrupt实现应该与此有关系;
    interrupt方法的源码也在jvm.cpp文件:
1
2
3
4
5
6
7
8
9
10
11
12
13
JVM_ENTRY(void, JVM_Interrupt(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_Interrupt");

// Ensure that the C++ Thread and OSThread structures aren't freed before we operate
oop java_thread = JNIHandles::resolve_non_null(jthread);
MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
// We need to re-resolve the java_thread, since a GC might have happened during the
// acquire of the lock
JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
if (thr != NULL) {
Thread::interrupt(thr);
}
JVM_END
  • JVM_Interrupt对参数进行了校验,然后直接调用Thread::interrupt:
1
2
3
4
5
void Thread::interrupt(Thread* thread) {
trace("interrupt", thread);
debug_only(check_for_dangling_thread_pointer(thread);)
os::interrupt(thread);
}
  • Thread::interrupt调用os::interrupt方法实现,os::interrupt方法定义在os_linux.cpp:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void os::interrupt(Thread* thread) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");

//获取系统native线程对象
OSThread* osthread = thread->osthread();

if (!osthread->interrupted()) {
osthread->set_interrupted(true);
//内存屏障,使osthread的interrupted状态对其它线程立即可见
OrderAccess::fence();
//前文说过,_SleepEvent用于Thread.sleep,线程调用了sleep方法,则通过unpark唤醒
ParkEvent * const slp = thread->_SleepEvent ;
if (slp != NULL) slp->unpark() ;
}

//_parker用于concurrent相关的锁,此处同样通过unpark唤醒
if (thread->is_Java_thread())
((JavaThread*)thread)->parker()->unpark();
//synchronized同步块和Object.wait() 唤醒
ParkEvent * ev = thread->_ParkEvent ;
if (ev != NULL) ev->unpark() ;

}

由此可见,interrupt其实就是通过ParkEvent的unpark方法唤醒对象;另外要注意:

  • object.wait、Thread.sleep和Thread.join会抛出InterruptedException并清除中断状态;
  • Lock.lock()方法不会响应中断,Lock.lockInterruptibly()方法则会响应中断并抛出异常,区别在于park()等待被唤醒时lock会继续执行park()来等待锁,而 lockInterruptibly会抛出异常;
  • synchronized被唤醒后会尝试获取锁,失败则会通过循环继续park()等待,因此实际上是不会被interrupt()中断的;
  • 一般情况下,抛出异常时,会清空Thread的interrupt状态,在编程时需要注意;

3. 网络中断相关

1
2
3
4
5
6
7
8
9
10
private volatile Interruptible blocker;
private final Object blockerLock = new Object();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(this);
return;
}
}
  • 其中blocker是Thread的成员变量,Thread提供了blockedOn方法可以设置blocker:
1
2
3
4
5
void blockedOn(Interruptible b) {
synchronized (blockerLock) {
blocker = b;
}
}
  • 如果一个nio通道实现了InterruptibleChannel接口,就可以响应interrupt()中断,其原理就在InterruptibleChannel接口的抽象实现类AbstractInterruptibleChannel的方法begin()中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
protected final void begin() {
if (interruptor == null) {
interruptor = new Interruptible() {
public void interrupt(Thread target) {
synchronized (closeLock) {
if (!open)
return;
open = false;
interrupted = target;
try {
AbstractInterruptibleChannel.this.implCloseChannel();
} catch (IOException x) { }
}
}};
}
blockedOn(interruptor);//设置当前线程的blocker为interruptor
Thread me = Thread.currentThread();
if (me.isInterrupted())
interruptor.interrupt(me);
}

protected final void end(boolean completed)
throws AsynchronousCloseException
{
blockedOn(null);//设置当前线程的blocker为null
Thread interrupted = this.interrupted;
//如果发生中断,Thread.interrupt方法会调用Interruptible的interrupt方法,
//设置this.interrupted为当前线程
if (interrupted != null && interrupted == Thread.currentThread()) {
interrupted = null;
throw new ClosedByInterruptException();
}
if (!completed && !open)
throw new AsynchronousCloseException();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//Class java.nio.channels.Channels.WritableByteChannelImpl
public int write(ByteBuffer src) throws IOException {
......
try {
begin();
out.write(buf, 0, bytesToWrite);
finally {
end(bytesToWrite > 0);
}
......
}

//Class java.nio.channels.Channels.ReadableByteChannelImpl
public int read(ByteBuffer dst) throws IOException {
......
try {
begin();
bytesRead = in.read(buf, 0, bytesToRead);
finally {
end(bytesRead > 0);
}
......
}
  • 以上述代码为例,nio通道的ReadableByteChannel每次执行阻塞方法read()前,都会执行begin(),把Interruptible回调接口注册到当前线程上。当线程中断时,Thread.interrupt()触发回调接口Interruptible关闭io通道,导致read方法返回,最后在finally块中执行end()方法检查中断标记,抛出ClosedByInterruptException;

Selector的实现类似:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//java.nio.channels.spi.AbstractSelector
protected final void begin() {
if (interruptor == null) {
interruptor = new Interruptible() {
public void interrupt(Thread ignore) {
AbstractSelector.this.wakeup();
}};
}
AbstractInterruptibleChannel.blockedOn(interruptor);
Thread me = Thread.currentThread();
if (me.isInterrupted())
interruptor.interrupt(me);
}
protected final void end() {
AbstractInterruptibleChannel.blockedOn(null);
}
//sun.nio.ch.class EPollSelectorImpl
protected int doSelect(long timeout) throws IOException {
......
try {
begin();
pollWrapper.poll(timeout);
} finally {
end();
}
......
}

可以看到当发生中断时会调用wakeup方法唤醒poll方法,但并不会抛出中断异常;

4. 通过interrupted()关闭线程

  • 总所周知,Thread.stop, Thread.suspend, Thread.resume 都已经被废弃了。因为它们太暴力了,是不安全的,这种暴力中断线程是一种不安全的操作,举个栗子来说明其可能造成的问题:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class ThreadTest {
public static void main(String[] args) throws InterruptedException {
StopThread thread = new StopThread();
thread.start();
// 休眠1秒,确保线程进入运行
Thread.sleep(1000);
// 暂停线程
thread.stop();
// thread.interrupt();
// 确保线程已经销毁
while (thread.isAlive()) { }
// 输出结果
thread.print();
}

private static class StopThread extends Thread {

private int x = 0; private int y = 0;

@Override
public void run() {
// 这是一个同步原子操作
synchronized (this) {
++x;
try {
// 休眠3秒,模拟耗时操作
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
++y;
}
}

public void print() {
System.out.println("x=" + x + " y=" + y);
}
}
}

上述代码中,run方法里是一个同步的原子操作,x和y必须要共同增加,然而这里如果调用thread.stop()方法强制中断线程,输出如下:

1
x=1 y=0

没有异常,也破坏了我们的预期。如果这种问题出现在我们的程序中,会引发难以预期的异常。因此这种不安全的方式很早就被废弃了。取而代之的是interrupt(),上述代码如果采用thread.interrupt()方法,输出结果如下:

1
2
3
4
x=1 y=1
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at ThreadTest$StopThread.run(ThreadTest.java:28)

x=1,y=1 这个结果是符合我们的预期,同时还抛出了个异常,这个异常下文详说。

interrupt() 它基于「一个线程不应该由其他线程来强制中断或停止,而是应该由线程自己自行停止。」思想,是一个比较温柔的做法,它更类似一个标志位。其实作用不是中断线程,而是「通知线程应该中断了」,具体到底中断还是继续运行,应该由被通知的线程自己处理。

interrupt() 并不能真正的中断线程,这点要谨记。需要被调用的线程自己进行配合才行。也就是说,一个线程如果有被中断的需求,那么就需要这样做:

  1. 在正常运行任务时,经常检查本线程的中断标志位,如果被设置了中断标志就自行停止线程。
  2. 在调用阻塞方法时正确处理InterruptedException异常。(例如:catch异常后就结束线程。)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 核心 interrupt 方法
public void interrupt() {
if (this != Thread.currentThread()) // 非本线程,需要检查权限
checkAccess();

synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // 仅仅设置interrupt标志位
b.interrupt(this); // 调用如 I/O 操作定义的中断方法
return;
}
}
interrupt0();
}
// 静态方法,这个方法有点坑,调用该方法调用后会清除中断状态。
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}
// 这个方法不会清除中断状态
public boolean isInterrupted() {
return isInterrupted(false);
}
// 上面两个方法会调用这个本地方法,参数代表是否清除中断状态
private native boolean isInterrupted(boolean ClearInterrupted);

首先讲 interrupt() 方法:

  1. interrupt 中断操作时,非自身打断需要先检测是否有中断权限,这由jvm的安全机制配置;
  2. 如果线程处于sleep, wait, join等状态,那么线程将立即退出被阻塞状态,并抛出一个InterruptedException异常;
  3. 如果线程处于I/O阻塞状态,将会抛出ClosedByInterruptException(IOException的子类)异常;
  4. 如果线程在Selector上被阻塞,select方法将立即返回;
  5. 如果非以上情况,将直接标记 interrupt 状态;

注意:interrupt 操作不会打断所有阻塞,只有上述阻塞情况才在jvm的打断范围内,如处于锁阻塞的线程,不会受 interrupt 中断;

等待情况下中断,抛出异常后线程恢复非中断状态,即 interrupted = false

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ThreadTest {

public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(new Task("1"));
t.start();
t.interrupt();
}

static class Task implements Runnable{
String name;

public Task(String name) {
this.name = name;
}

@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println("thread has been interrupt!");
}
System.out.println("isInterrupted: " + Thread.currentThread().isInterrupted());
System.out.println("task " + name + " is over");
}
}
}

输出:

1
2
3
thread has been interrupt!
isInterrupted: false
task 1 is over

调用Thread.interrupted() 方法后线程恢复非中断状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ThreadTest {

public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(new Task("1"));
t.start();
t.interrupt();
}

static class Task implements Runnable{
String name;

public Task(String name) {
this.name = name;
}

@Override
public void run() {
System.out.println("first :" + Thread.interrupted());
System.out.println("second:" + Thread.interrupted());
System.out.println("task " + name + " is over");
}
}
}

输出结果:

1
2
3
first :true
second:false
task 1 is over

上述两种隐含的状态恢复操作,是符合常理的,因为线程标记为中断后,用户没有真正中断线程,必然将其恢复为false。理论上Thread.interrupted()调用后,如果已中断,应该执行退出操作,不会重复调用。

打赏
  • 版权声明: 本博客所有文章除特别声明外,均采用 Apache License 2.0 许可协议。转载请注明出处!
  • © 2019-2022 Zhuuu
  • PV: UV:

请我喝杯咖啡吧~

支付宝
微信