程序:静态的代码
进程:运行中的程序
线程:进程的进一步细分,程序的一条执行路径
实现多线程:
1.继承Thread类
2.实现Runnable接口
以下是关系到线程运行状态的几个方法:
1)start方法
start()用来启动一个线程,当调用start方法后,系统才会开启一个新的线程来执行用户定义的子任务,在这个过程中,会为相应的线程分配需要的资源。
2)run方法
run()方法是不需要用户来调用的,当通过start方法启动一个线程之后,当线程获得了CPU执行时间,便进入run方法体去执行具体的任务。注意,继承Thread类必须重写run方法,在run方法中定义具体要执行的任务。
3)sleep方法
sleep相当于让线程睡眠,交出CPU,让CPU去执行其他的任务。
实现java.lang.Runnable接口
用Runnable也是非常常见的一种,我们只需要重写run方法即可。
class Thread1 extends Thread{
private String name;
public Thread1(String name) {
this.name=name;
}
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println(name + "运行 : " + i);
try {
sleep((int) Math.random() * 10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class Main {
public static void main(String[] args) {
Thread1 mTh1=new Thread1("A");
Thread1 mTh2=new Thread1("B");
mTh1.start();
mTh2.start();
}
}
class Thread2 implements Runnable{
private String name;
public Thread2(String name) {
this.name=name;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println(name + "运行 : " + i);
try {
Thread.sleep((int) Math.random() * 10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class Main {
public static void main(String[] args) {
new Thread(new Thread2("C")).start();
new Thread(new Thread2("D")).start();
}
}
Thread和Runnable的区别:
对比一下继承的方式 vs 实现的方式
1.联系:public class Thread implements Runnable(继承的方式的Thread也实现了Runnable接口)
2.哪个方式好?
实现的方式优于继承的方式 why?
① 避免了java单继承的局限性
② 如果多个线程要操作同一份资源(或数据),更适合使用实现的方式
//模拟火车站售票窗口,开启三个窗口售票,总票数为100张
//存在线程的安全问题
class Window extends Thread {
int ticket = 100;
public void run() {
while (true) {
if (ticket > 0) {
System.out.println(Thread.currentThread().getName() + "售票,票号为:"+ ticket--);
} else {
break;
}
}
}
}
public class TestWindow {
public static void main(String[] args) {
Window w1 = new Window();
Window w2 = new Window();
Window w3 = new Window();
w1.setName("窗口1");
w2.setName("窗口2");
w3.setName("窗口3");
w1.start();
w2.start();
w3.start();
}
}
//会卖出300张票
class Window implements Runnable {
int ticket = 100;//要将全局变量声明为静态,不然每个对象都有这个属性,会卖出300张票
public void run() {
while (true) {
if (ticket > 0) {
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "售票,票号为:"+ ticket--);
} else {
break;
}
}
}
}
public class Main {
//模拟火车站售票窗口,开启三个窗口售票,总票数为100张
//存在线程的安全问题
public static void main(String[] args) {
Window w1 = new Window();
Thread t1 = new Thread(w1, "t1");
Thread t2 = new Thread(w1, "t2");
Thread t3 = new Thread(w1, "t3");
t1.start();
t2.start();
t3.start();
}
}
问题原因:
某个线程执行完输出ticket后,还没有来得及ticket--,CPU时间片被分配给了另外一个线程,导致同一个票号被输出2次。
另外一种情况,打印到ticket=1时,有2个线程同时进入到了条件里,导致-1的票号被输出。
解决办法,为关键代码段,加锁,参见后文。
中断线程
当线程的run方法执行方法体中最后一条语句后,并经由return语句返回时,或者出现了在方法中过没有捕获的异常时,线程将被终止。
线程同步
根据各线程访问数据的次序,可能会产生讹误的对象。这样的一个情况称为竞争条件(race condition)。
1)竞争条件的一个例子
银行例程:多线程操作时,本应恒等的余额总值发生了变化。
public class SynchBankTest
{
public static final int NACCOUNTS = 100;
public static final double INITIAL_BALANCE = 1000;
public static void main(String[] args)
{
Bank b = new Bank(NACCOUNTS, INITIAL_BALANCE);
int i;
for (i = 0; i < NACCOUNTS; i++)
{
TransferRunnable r = new TransferRunnable(b, i, INITIAL_BALANCE);
Thread t = new Thread(r);
t.start();
}
}
}
public class Bank
{
private final double[] accounts;
public Bank(int n, double initialBalance)
{
accounts = new double[n];
for (int i = 0; i < accounts.length; i++)
accounts[i] = initialBalance;
}
public void transfer(int from, int to, double amount) throws InterruptedException
{
if (accounts[from] >= amount) {
System.out.print(Thread.currentThread());
accounts[from] -= amount;
System.out.printf(" %10.2f from %d to %d", amount, from, to);
accounts[to] += amount;
System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());
}
}
public double getTotalBalance()
{
double sum = 0;
for (double a : accounts)
sum += a;
return sum;
}
public int size()
{
return accounts.length;
}
}
public class TransferRunnable implements Runnable
{
private Bank bank;
private int fromAccount;
private double maxAmount;
private int DELAY = 10;
public TransferRunnable(Bank b, int from, double max)
{
bank = b;
fromAccount = from;
maxAmount = max;
}
public void run()
{
try
{
while (true)
{
int toAccount = (int) (bank.size() * Math.random());
double amount = maxAmount * Math.random();
bank.transfer(fromAccount, toAccount, amount);
Thread.sleep((int) (DELAY * Math.random()));
}
}
catch (InterruptedException e)
{
}
}
}
2)详解竞争条件
假定两个线程同时执行指令
accounts[to] += amount;
这不是原子操作。该指令可能被处理如下:
1)将accounts[to]加载到寄存器
2)增加amount
3)将结果写回accounts[to]
假定第一个线程执行步骤1和2,然后,它被剥夺了运行权。假定第二个线程被唤醒并修改了accounts数组中的同一项。然后,第一个线程被唤醒并完成其第三步。这一动作擦去了第二个线程所做的更新。
线程同步
当使用多个线程来访问同一个数据时,非常容易出现线程安全问题(比如多个线程都在操作同一数据导致数据不一致),所以我们用同步机制来解决这些问题。
锁和条件的关键之处:
锁用来保护代码片段,任意时刻只能有一个线程执行被保护的代码。
锁可以管理试图进入保护代码片段的线程
锁可以拥有一个或者多个相关的条件对象
每个条件对象管理那些已经进入被保护的代码段但还不能运行的线程。
锁对象
在Java SE5.0引入ReentrantLock类。Lock是Java.util.concurrent.locks包下的接口,Lock 实现提供了比使用synchronized 方法和语句可获得的更广泛的锁定操作。
用ReentrantLock保护代码块的基本结构如下:
myLock.lock(); //a ReentrantLock object
try
{
critical section
}
finally
{
myLock.unlock();//确保代码抛出异常锁必须被释放
}
这一结构确保任何时刻只有一个线程进入临界区。一旦一个线程封锁了锁对象,其他任何线程都无法通过lock语句。当其他线程调用lock时,他们被阻塞,直到第一个线程释放锁对象。把解锁操作放在finally子句之内是至关重要的。如果在临界区的代码抛出异常,锁必须释放。否则,其他线程将永远阻塞。
public class Bank
{
private Lock bankLock= new ReentrantLock();
public void transfer(int from, int to, double amount)
{
bankLock.lock();
try
{
System.out.print(Thread.currentThread());
accounts[from] -= amount;
System.out.printf(" %10.2f from %d to %d", amount, from, to);
accounts[to] += amount;
System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());
}
finally
{
bankLock.unlock();
}
}
public double getTotalBalance()
{
bankLock.lock();
try
{
double sum = 0;
for (double a : accounts)
sum += a;
return sum;
}
finally
{
bankLock.unlock();
}
}
}
假定一个线程调用transfer,在执行结束前被剥夺运行权。假定第二个线程也调用了transfer,由于第二个线程并不能获得锁,将在调用lock方法时被阻塞。他必须等待第一个线程完成transfer方法之后才能再度被激活。当第一个线程释放锁时,第二个线程才能开始运行。这样余额就不会出错了。
每个Bank对象有自己的ReentrantLock对象,如果两个线程试图访问同一个Bank对象,那么锁以串行方式提供服务。但是,如果两个线程访问不同的Bank对象,每个线程得到不同的锁对象,两个线程都不会发生阻塞。两个线程在操作不同的Bank实例的时候,线程之间不会相互影响。
锁是可重入的,因为线程可以重复的获得已经持有的锁。锁保持一个持有计数来跟踪对Lock方法的嵌套调用。线程每一次调用都用unlock来释放锁。由于这一特性,被一个锁保护的代码可以调用另一个使用相同锁的方法。例如,transfer方法调用getTotalBalance方法,这也会封锁bankLock对象,此时bankLock对象的持有计数为2,当getTotalBalance方法退出时,持有计数变为1,当transfer方法退出时,持有计数变为0。线程释放锁。通常,可能想要保护需若干个操作来更新或者检查共享对象的代码块。要确保这些操作完成后,另一个线程才能使用相同的对象。
要留心临界区的代码,不要因为异常的抛出二跳出了临界区。如果在临界区代码结束之前抛出了异常,finally字句释放锁,但会使对象可能出于一种受损状态。
条件对象
通常,线程进入临界区,却发现在某一条件满足后才能执行。要使用一个条件对象来管理那些已经获得了一个锁,但是不能做有用工作的线程。(条件对象经常被称为条件变量)
分析上文中的银行模拟程序,
if(bank.getBalance(from)>=amount)
transfer(from, to, amount);
如果当前程序通过if条件判断,且在调用transfer之前被中断,在线程再次运行前,账户余额可能已经低于提款金额。必须确保没有其他线程在本检查余额与转账活动之间修改余额。通过使用锁来保护检查与转账动作来做到这一点:
public void transfer(int from, int to, double amount)
{
bankLock.lock();
try
{
while (accounts[from] < amount)
{
//wait()
}
//transfer funds
........
finally
{
bankLock.unlock();
}
}
现在,当账户中没有足够的余额时,等待直到另一个线程向账户中注入资金。但是,这一线程刚刚获得了bankLock的排他性访问,因此别的线程没有进行存款操作的机会,这就是为什么需要用条件对象的原因。
一个锁对象可以有一个或者多个相关的条件对象。可以用newCondition方法获得一个条件对象。习惯的给每一个条件对象命名为可以反应它所表达条件的名字。如sufficientFunds = bankLock.newCondition();
如果transfer方法发现余额不足,它调用sufficientFunds.await();当前线程被他阻塞了,并放弃锁。我们希望这样可以等待另一个线程进行增加账户余额的操作。
等待获得锁的线程和调用await方法的线程在本质上存在不同。一旦一个线程调用await方法,他进入该条件的等待集。当该锁可用时,该线程不能马上解除阻塞。相反,它处于阻塞状态,直到另一个线程调用同一条件上的signalAll方法时为止。
当另一个线程转账时,它应该调用sufficientFunds.signalAll();这一调用重新激活因为这一条件等待的所有线程。当这些线程从等待集当中移除时,他们再次成为可运行的,调度器将再次激活它们。同时,他们试图重新进入该对象。一旦锁成为可用,他们将从await调用返回,获得该锁并从被阻塞的地方继续执行。
此时,线程应该再次测试该条件。由于无法确保该条件被满足,signalAll方法仅仅通知正在等待的线程:此时有可能已经满足条件,值得再次去检测条件。
最关重要的是最终需要某个其他线程调用signalAll方法。当一个线程调用await时,他没有办法自己激活自身,它寄希望于其他线程。如果没有其他线程重新来激活等待的线程,他就永远不再运行。导致死锁。如果所有其他线程被阻塞,最后一个线程再解除其他阻塞线程之前就调用await,那么它也被阻塞。没有线程解除其他阻塞线程,那么该程序就挂起。
应该何时调用signalAll呢,在本例中,当一个账户的余额发生改变时,等待的线程就有机会检查余额。调用signalAll不会立即激活一个等待线程。它仅仅解除等待线程的阻塞,以便这些线程可以在当前线程同步推出后,通过竞争实现对对象的访问。当一个线程拥有某个条件的锁时,它仅仅可以在该条件上调用await,signalAll和signal方法。
package synch;
import java.util.concurrent.locks.*;
public class Bank
{
private final double[] accounts;
private Lock bankLock;
private Condition sufficientFunds;
public Bank(int n, double initialBalance)
{
accounts = new double[n];
for (int i = 0; i < accounts.length; i++)
accounts[i] = initialBalance;
bankLock = new ReentrantLock();
sufficientFunds = bankLock.newCondition();
}
public void transfer(int from, int to, double amount) throws InterruptedException
{
bankLock.lock();
try
{
while (accounts[from] < amount) {
sufficientFunds.await();
}
System.out.print(Thread.currentThread());
accounts[from] -= amount;
System.out.printf(" %10.2f from %d to %d", amount, from, to);
accounts[to] += amount;
System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());
sufficientFunds.signalAll();
}
finally
{
bankLock.unlock();
}
}
public double getTotalBalance()
{
bankLock.lock();
try
{
double sum = 0;
for (double a : accounts)
sum += a;
return sum;
}
finally
{
bankLock.unlock();
}
}
public int size()
{
return accounts.length;
}
}
[java] view plain copy
synch/TransferRunnable.java
package synch;
public class TransferRunnable implements Runnable
{
private Bank bank;
private int fromAccount;
private double maxAmount;
private int DELAY = 10;
public TransferRunnable(Bank b, int from, double max)
{
bank = b;
fromAccount = from;
maxAmount = max;
}
public void run()
{
try
{
while (true)
{
int toAccount = (int) (bank.size() * Math.random());
double amount = maxAmount * Math.random();
bank.transfer(fromAccount, toAccount, amount);
Thread.sleep((int) (DELAY * Math.random()));
}
}
catch (InterruptedException e)
{
}
}
}
package synch;
public class SynchBankTest
{
public static final int NACCOUNTS = 100;
public static final double INITIAL_BALANCE = 1000;
public static void main(String[] args)
{
Bank b = new Bank(NACCOUNTS, INITIAL_BALANCE);
int i;
for (i = 0; i < NACCOUNTS; i++)
{
TransferRunnable r = new TransferRunnable(b, i, INITIAL_BALANCE);
Thread t = new Thread(r);
t.start();
}
}
}
消费者和生产者:
public class Consumer implements Runnable {
Buffer buffer;
public Consumer(Buffer buffer) {
this.buffer = buffer;
}
@Override
public void run() {
while (true)
{
buffer.take();
try {
Thread.sleep((int)(Math.random()*20));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class Producer implements Runnable {
Buffer buffer;
public Producer(Buffer buffer) {
this.buffer = buffer;
}
@Override
public void run() {
while (true)
{
buffer.put();
try {
Thread.sleep((int)(Math.random()*10));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class Buffer {
private List<Date> storage;
private int maxSize;
private Lock lock;
private Condition notFull;//不满--通知生产者继续生产
private Condition notEmpty;//不空--通知消费者继续消费
public Buffer(int size){
//使用锁lock,并且创建两个condition,相当于两个阻塞队列
lock=new ReentrantLock();
notFull=lock.newCondition();
notEmpty=lock.newCondition();
maxSize=size;
storage=new LinkedList<>();
}
//生产者线程调用
public void put()
{
lock.lock();
if(storage.size() >= maxSize)//说明队列已满
{
try {
notFull.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
else
{
Date date = new Date();
storage.add(date);
System.out.println(Thread.currentThread().getName() + "当前集合大小" + storage.size());
System.out.println("生产了一个日期"+date.toLocaleString());
notEmpty.signalAll();//通知其他的消费者线程,去消费
}
lock.unlock();
}
//消费者线程调用
public Date take()
{
lock.lock();
Date date = null;
if(storage.size()==0)
{
try {
notEmpty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
else
{
date = storage.get(storage.size()-1);
storage.remove(date);
System.out.println(Thread.currentThread().getName() + "当前集合大小" + storage.size());
System.out.println("消费时间"+ date.toLocaleString());
notFull.signalAll();
}
lock.unlock();
return date;
}
}
public class Main {
public static void main(String[] args) {
Buffer buffer = new Buffer(20);
Producer producer = new Producer(buffer);
Consumer consumer = new Consumer(buffer);
Consumer consumer2 = new Consumer(buffer);
new Thread(producer,"生产者").start();
new Thread(consumer,"消费者1号").start();
new Thread(consumer2,"消费者2号").start();
}
}
synchronized关键字
等价于方法里加锁
public synchronized void transfer(){
wait();
System.out.println("");
notifyAll();
}
public void transfer(){
sufficientFunds.await();
System.out.println("");
sufficientFunds.signalAll();
}
阻塞队列
LinkedBlockingQueue
方法 正常动作 特殊情况下的动作
add 添加一个元素 队列满时抛出IllegalStateException异常
element 返回队列的头元素 队列空时抛出NoSuchElementException异常
offer 添加一个元素并返回true 如果队列满,则返回false
peek 返回队列的头元素 如果队列空,则返回null
poll 移出并返回队列的头元素 如果队列空,则返回null
put 添加一个元素 如果队列满,则阻塞
remove 移出并返回头元素 队列空时抛出NoSuchElementException异常
take 移出并返回头元素 如果队列空,则阻塞
Queue<String> queue = new ArrayDeque<>();
queue.add("zhangsan");
queue.add("lisi");
System.out.println(queue.remove());
System.out.println(queue.remove());
---->zhangsan lisi
Stack<String> stack = new Stack<>();
stack.push("zhangsan");
stack.push("lisi");
String string = stack.pop();
System.out.println(string);
string = stack.pop();
System.out.println(string);
-----> lisi zhangsan
许多线程问题可以通过使用一个或多个队列以优雅且安全的方式将其形式化。生产者线程向队列插入元素,消费者线程则取出它们。使用队列,可以安全地从一个线程向另一个线程传递数据。例如,转账程序中,转账线程将转账指令对象插入一个队列中,而不是直接访问银行对象。另一个线程从队列中取出指令执行转账。只有该线程可以访问该银行对象的内部。因此不需要同步。(当然,线程安全的队列类的实现者不能不考虑锁和条件。)
当试图向队列添加元素而队列已满,或是想从队列移出元素而队列为空的时候,阻塞队列(BlockingQueue)导致线程阻塞。队列会自动地平衡负载。
LinkedBlockingQueue的容量在默认下是没有上边界的,也可设置之。
class FilePicker implements Runnable
{
private String path;
private BlockingQueue<File> blockingQueue;
public FilePicker(String path, BlockingQueue<File> blockingQueue) {
this.path = path;
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
File file = new File(path);//"C:\\Users\\ttc\\IdeaProjects\\FileOperDemo\\src\\com\\company"
if(!file.exists())
{
System.out.println("error path");
return;
}
try {
ListFiles(file);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
blockingQueue.put(TestBlockingQueue.END_FLAG);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("FilePicker byebye");
}
//打印出path目录下全部的文件名
public void ListFiles(File path) throws InterruptedException {
Thread.sleep(100);
File[] files = path.listFiles();
for(File file : files)
{
if(file.isDirectory())
{
ListFiles(file);
}
else
{
try {
blockingQueue.put(file);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
class FileAnalyzer implements Runnable
{
private String word;
private BlockingQueue<File> blockingQueue;
public FileAnalyzer(String word, BlockingQueue<File> blockingQueue) {
this.word = word;
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
File file = null;
while (file!=TestBlockingQueue.END_FLAG)
{
try {
file = blockingQueue.take();
if(file != TestBlockingQueue.END_FLAG)
{
FileInputStream fileInputStream = null;
try {
fileInputStream = new FileInputStream(file);
} catch (FileNotFoundException e) {
e.printStackTrace();
}
Scanner scanner = new Scanner(fileInputStream);
int lineno = 1;
while (scanner.hasNext())
{
String string = scanner.nextLine();
if(string.contains(word))
{
System.out.println("在"+file.getName()+"找到了字符串" + word + "在第" + lineno + "行:" + string);
}
lineno++;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
blockingQueue.put(TestBlockingQueue.END_FLAG);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("FileAnalyzer byebye");
}
}
public class TestBlockingQueue {
public static final String ROOT_DIR = "C:\\Users\\ttc\\IdeaProjects\\FileOperDemo\\src\\com\\company";
public static final String KEY_WORD = "int ";
public static final int THREAD_COUNT = 3;
public static final int Q_SIZE = 10;
public static final File END_FLAG = new File("");
public static void main(String[] args) {
BlockingQueue<File> q = new LinkedBlockingQueue<File>(Q_SIZE);
FilePicker aFilePicker = new FilePicker(ROOT_DIR,q);
new Thread(aFilePicker).start();
for(int i=0; i< THREAD_COUNT; i++)
{
FileAnalyzer analyzer = new FileAnalyzer(KEY_WORD,q);
new Thread(analyzer).start();
}
}
}
public class ScannerTest {
public static void main(String[] args) throws FileNotFoundException {
File file = new File("d:/abc.txt");
FileInputStream fileInputStream = new FileInputStream(file);
Scanner scanner = new Scanner(fileInputStream);
while (scanner.hasNext())
{
String string = scanner.nextLine();
System.out.println(string);
}
}
}