JUC-02-生产者消费者

JUC-02-生产者消费者

mark

1. Synchronized 版本

双线程案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package com.zhuuu.PC;


/*
线程之间的通信问题:
线程交替执行: A B 操作同一个变量 num = 0
A num + 1
B num - 1
*/

public class A {
public static void main(String[] args) {
Data data = new Data();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
}
}



// 资源类
// 等待 业务 通知
class Data{
private int number = 0;

// + 1
public synchronized void increment() throws InterruptedException {
// 等待 业务 通知
if (number != 0){
// 等待操作
this.wait();
}
number ++;
System.out.println(Thread.currentThread().getName()+"--->"+number);
// 通知线程 + 1完毕
this.notifyAll();
}



// - 1
public synchronized void decrement() throws InterruptedException {
// 等待 业务 通知
if (number == 0){
this.wait();
}
number --;
System.out.println(Thread.currentThread().getName()+"--->"+number);
// 通知线程 - 1完毕
this.notifyAll();
}
}

存在的问题(如果增加线程):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();

new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();

mark

1.1 虚假唤醒问题

产生的原因:虚假唤醒问题

在java.lang.object (wait()方法)

mark

解决方法:if 改为 while

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 资源类
// 等待 业务 通知
class Data{
private int number = 0;

// + 1
public synchronized void increment() throws InterruptedException {
// 等待 业务 通知
while (number != 0){
// 等待操作
this.wait();
}
number ++;
System.out.println(Thread.currentThread().getName()+"--->"+number);
// 通知线程 + 1完毕
this.notifyAll();
}



// - 1
public synchronized void decrement() throws InterruptedException {
// 等待 业务 通知
while (number == 0){
this.wait();
}
number --;
System.out.println(Thread.currentThread().getName()+"--->"+number);
// 通知线程 - 1完毕
this.notifyAll();
}
}

2. JUC 版本

Condition 接口:

mark

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 例如,假设我们有一个有限的缓冲区,它支持put和take方法。 如果在一个空的缓冲区尝试一个take ,则线程将阻塞直到一个项目可用; 如果put试图在一个完整的缓冲区,那么线程将阻塞,直到空间变得可用。 我们希望在单独的等待集中等待put线程和take线程,以便我们可以在缓冲区中的项目或空间可用的时候使用仅通知单个线程的优化。 这可以使用两个Condition实例来实现。 

class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();

final Object[] items = new Object[100];
int putptr, takeptr, count;

public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally { lock.unlock(); }
}

public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally { lock.unlock(); }
}
}

和之前的对比:

mark

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package com.zhuuu.PC;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class B {
public static void main(String[] args) {

Data2 data2 = new Data2();

new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data2.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data2.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();

new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data2.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();

new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data2.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();
}
}



// 资源类
// 等待 业务 通知
class Data2{
private int number = 0;

Lock lock = new ReentrantLock();

// Condition取代了对象监视器
Condition condition = lock.newCondition();

// + 1
public void increment() throws InterruptedException {
lock.lock();

try {
while (number != 0){
// 等待操作
condition.await();
}
number ++;
System.out.println(Thread.currentThread().getName()+"--->"+number);
// 通知线程 + 1完毕
condition.signalAll();
}catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}



// - 1
public synchronized void decrement() throws InterruptedException {
lock.lock();
try {
// 等待 业务 通知
while (number == 0){
condition.await();
}
number --;
System.out.println(Thread.currentThread().getName()+"--->"+number);
// 通知线程 - 1完毕
condition.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}

现在有了新的需求:

之前:A,B,C,D 四条线程是无序的

现在 要求 按照A B C D的顺序唤醒线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package com.zhuuu.PC;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

// 用于生产线 : 下单 = 》 支付 = 》 到货

public class C {
public static void main(String[] args) {

Data3 data3 = new Data3();

new Thread(()->{
for (int i = 0; i < 5; i++) {
data3.printA();
}
},"A").start();

new Thread(()->{
for (int i = 0; i < 5; i++) {
data3.printB();
}
},"B").start();

new Thread(()->{
for (int i = 0; i < 5; i++) {
data3.printC();
}
},"C").start();
}
}


// 资源类
class Data3{
// 要求 A 执行完 调用 B 执行完 调用 C

private Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
Condition condition3 = lock.newCondition();
private int number = 1;

public void printA(){
lock.lock();
// 判断 业务 通知
try {
while (number != 1){
// 等待
condition1.await();
}
System.out.println(Thread.currentThread().getName()+"->A");
// 唤醒指定的人
number = 2;
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void printB(){
lock.lock();
try {
while (number != 2){
condition2.await();
}

// 唤醒3
number = 3;
condition3.signal();

} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void printC(){
lock.lock();
try {
while (number != 3){
condition3.await();
}

// 唤醒A
number = 1;
condition1.signal();

} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
打赏
  • 版权声明: 本博客所有文章除特别声明外,均采用 Apache License 2.0 许可协议。转载请注明出处!
  • © 2019-2022 Zhuuu
  • PV: UV:

请我喝杯咖啡吧~

支付宝
微信