Java并发


实现并发的最好的方式就是在操作系统级别使用进程

多任务操作系统可以周期性地将处理器从一个进程切换到另一个进程,来实现同时运行多个进程

线程:一个线程就是一个进程中单一的顺序控制流

程序的多个任务指的就是进程中的多个线程,所谓并发对于单处理器的计算就是,就是把CPU轮流给每个任务分配使用时间(切分CPU时间),多处理器则是多个处理器同时运行

1. 基本线程机制

1.1 定义任务

线程驱动任务,利用Runnable接口描述任务

concurency/LiftOff.java

import  java.lang.Runnable;
import java.util.concurrent.CountDownLatch;

public class LiffOff implements Runnable{
    protected int countDown = 10;
    private static int taskCount = 0;
    /**
     * 用来区分任务的多个实例
     */
    private final int id = taskCount++;
    public LiffOff(){}
    public LiffOff(int countDown){
        this.countDown = countDown;
    }
    public String status(){
        return "#"+id+"("+(countDown>0?countDown:"LifftOff!") + "). ";
    }
    public void run(){
        while(countDown-- >0){
            System.out.println(status());
            /**
             * 线程调度器,表示切换到其他线程
             */
            Thread.yield();
        }
    }
}

Thread.yield表示线程调度器(表示已经执行玩生命周期的最重要的部分,此时可以切换给其他任务执行)

concurrency/MainThread.java

package concurrency;

public class MainThread {
    public static void main(String[] args){
        LiftOff launch = new LiftOff();
        launch.run();
    }
}

image-20220213095655170

Runnable导出一个类的时候,必须具有run方法(无任何特殊之处,一般附在其他线程上)

1.2 Thread类

把Runnable对象转换成工作任务的传统方式就是把他提交给一个Thread构造器

public class BasicThread {
    public static void main(String[] args){
        Thread t = new Thread(new LiffOff());
        /**
         * 该线程必须执行的初始化操作,在另一个线程产生了对LiffOff.run()的调用,不会影响主线程
         */
        t.start();
        System.out.println("Waiting for LiftOff");
    }
}

image-20220213100640087

  • 可以看到main线程和LiftOff线程同时执行

添加线程:

public class BasicThread {
    public static void main(String[] args){
        Thread t = new Thread(new LiffOff());
        /**
         * 产生多个线程
         */
        for(int i=0;i<5;i++){
            new Thread(new LiffOff()).start();
        }
        System.out.println("Waiting for LiftOff");
    }
}
#0(9) .
#1(9) .
#2(9) .
Waiting for LiftOff
#4(9) .
#3(9) .
#0(8) .
#1(8) .
#2(8) .
#4(8) .
#3(8) .
#0(7) .
#1(7) .
#2(7) .
#4(7) .
#3(7) .
#0(6) .
#1(6) .
#2(6) .
#4(6) .
#3(6) .

image-20220213101225062

如果机器存在多个处理器,那么线程调度器会在多个处理器之间默默分发线程

  • 注意,这里可以尝试吧前面的thread.yield去掉,可以发现线程有序输出(JDK1.8),交晚的java版本可能不会有序,是因为晚期的java版本更擅长去对cpu时间切片。
  • Thread对象在创建的时候可普通对象一样,main主程序结束之后直接清理,但是对于使用的Thread,在run没有结束死亡之前,垃圾回收机制是不会清理的
  • 多个线程和main线程同时运行

1.3 使用Executor(客户端与任务之间的中介)

Executor 在客户端和任务执行之间提供了一个间接层,与客户端执行任务不同,这个中间对象将执行任务

ExecutorService(具有生命周期的Exeutor)知道如何让构建恰当的上下文来执行Runnable对象

import java.util.concurrent.*;
public class CachedThreadPool {
    public static void main(String[] args){
        /**
         * 单个的Executor用来创建和管理系统中的所有任务
         */
        ExecutorService exec = Executors.newCachedThreadPool();
        for(int i=0;i<5;i++){
            exec.execute(new LiffOff());
        }
        /**
         * 防止新的任务被提交给这个Executor
         */
        exec.shutdown();
    }
}
  • 单个的Executor被用来创建和管理系统的所有任务

替换成不同类型的executor

package concurrency;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FixedThreadPool {
    public static void main(String[] args){
        //constructor argment is number of threads:
        ExecutorService exec = Executors.newFixedThreadPool(5);//5个线程
        for(int i = 0;i<5;i++){
            exec.execute(new LiftOff());
        }
        exec.shutdown();
    }
}
  • FixedThreadPool,可以一次性预先执行代价高昂的线程分配,对于需要线程的事件处理器,通过直接从池中获取线程,FixedThreadPool使用的Thread对象的数量是有限的
  • SingleThreadExecutor就像是线程数量为1的FixedThreadPool,可用在希望在一个线程连续运行的任务的情况,所有提交给这个线程的任务都会按顺序执行

image-20220213104911322

1.4 从任务中返回值

Runnable只是执行任务不返回任何值

通过实现Callable接口可以利用起类型参数的范型调用call()返回对象类型的值,并且必须使用submit调用

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;

class TaskWithResult implements Callable<String>{
    private int id;
    public TaskWithResult(int Id){
        this.id = Id;
    }
    public String call(){
        return "result of taskWithResult " + id;
    }
}
public class CallableDemo {
    public static void main(String[] args){
        ExecutorService exec = Executors.newCachedThreadPool();
        ArrayList<Future<String>> result = new ArrayList<Future<String>>();
        for(int i=0;i<10;i++){
            /**
             * submit()会返回Future对象,用Callable返回的特定类型进行结果的参数化
             */
            result.add(exec.submit(new TaskWithResult(i)));
        }
        for(Future<String> i : result){
            try{
                System.out.println(i.get());
            }catch (InterruptedException e){
                System.out.println(e);
                return;
            }catch (ExecutionException e){
                System.out.println(e);
                return;
            }finally {
                exec.shutdown();
            }
        }
    }
}

1.5 休眠

package concurrency;
import java.util.concurrent.*;
public class SleepintTask extends LiftOff{
    public void run(){
        try{
            while(countDown-->0){
                System.out.println(status());
                TimeUnit.MILLISECONDS.sleep(100);//每次执行完输出之后,都会进行休眠阻塞
            }

        }catch(InterruptedException e){
            System.err.println("Intrerupted");
        }
    }
    public static void main(String[] args){
        ExecutorService exec = Executors.newCachedThreadPool();
        for(int i = 0;i< 5;i++){
            exec.execute(new SleepintTask());
        }
        exec.shutdown();
    }
}

*因为进行了休眠阻塞,这样线程调度器就会执行切换到另一个线程

没有sleep的情况:

image-20220213115415824

sleep的情况:

image-20220213115459221

1.6 优先级

线程调度器更倾向于优先级高的线程先执行

  • 通过getPriority()读取现有线程的优先级,通过setPriority()设置优先级
package concurrency;
import java.util.concurrent.*;
public class SimplePriority implements Runnable{
    private int countDown = 5;
    private volatile double d;
    private int Priority;
    public SimplePriority(int priority){
        this.Priority = priority;
    }
    public String toString(){
        return Thread.currentThread() + " : " + countDown;
    }
    public void run(){
        Thread.currentThread().setPriority(Priority);
        while(true){
            for(int i = 1;i < 10000000;i++){
                d += (Math.PI + Math.E)/(double)i;
                if(i % 1000 ==0){
                    Thread.yield();
                }
            }
            System.out.println(this);
            if(--countDown == 0) return;
        }
    }
public static void main(String[] args){
        ExecutorService exec = Executors.newCachedThreadPool();
        for(int i=0;i<5;i++)
            exec.execute(new SimplePriority(Thread.MIN_PRIORITY));
        exec.execute(new SimplePriority(Thread.MAX_PRIORITY));
        exec.shutdown();
    }
}
  • 优先级高的先被执行

1.7 让步

  • 实现方法就是Thread.yield()

1.8 后台线程

所有非后台线程结束时,程序也就终止了,也会杀掉所有的后台线程,反过来,只要所有非后台线程还在运行,程序就没有终止

import java.util.concurrent.TimeUnit;

public class SimpleDaemons implements Runnable{
    public void run(){
        try{
            while(true){
                TimeUnit.MILLISECONDS.sleep(100);
                System.out.println(Thread.currentThread() + " "+ this);
            }
        }catch (InterruptedException e){
            System.out.println("sleep Interrupted");
        }
    }
    public static void main(String[] args) throws Exception{
        for(int i=0;i<10;i++){
            Thread daemon=new Thread(new SimpleDaemons());
            /**
             * 设置线程为后台线程
             */
            daemon.setDaemon(true);
            daemon.start();
        }
        System.out.println("All Thread Done");
        TimeUnit.MILLISECONDS.sleep(150);
    }
}
  • main为当前线程,尝试调节sleep的值会得到不同的结果,main结束之后所有的后台线程也会结束

image-20220213145556837

SimpleDaemon创建了显视的线程,通过编写定制的ThreadFactory可以定制由Exectuor创建的额线程的属性(后台,优先级,名称):

net/mindview/util/DaemonThreadFactor.java

package net.mindview.util;
import java.util.concurrent.*;
public class DeamonThreadFactory implements ThreadFactory{
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r);
        t.setDaemon(true);
        return t;
    }
}

这里将后台状态设置为了true

concurrency/DaemonFronFactory.java

import java.sql.Time;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class DaemonFromFactor implements Runnable{
    public void run(){
        try{
            while(true){
                TimeUnit.MILLISECONDS.sleep(100);
                System.out.println(Thread.currentThread()+ " "+this);
            }
        }catch (InterruptedException e){
            System.out.println(e);
        }
    }
    public static void main(String[] args) throws Exception{
        /**
         * 提供一个ThreadFactor类
         */
        ExecutorService exec = Executors.newCachedThreadPool(new DaemonThreadFactor());
        /**
         * 所有线程默认都是后台线程
         */
        for(int i=0;i<10;i++){
            exec.execute(new DaemonFromFactor());
        }
        System.out.println("All Daemon Start");
        TimeUnit.MILLISECONDS.sleep(500);
    }
}

image-20220213150210714

import java.util.concurrent.TimeUnit;

class Daemon implements Runnable{
    private Thread[] t = new Thread[10];
    public void run(){
        for(int i=0;i<10;i++){
            t[i] = new Thread(new DaemonSpawn());
            t[i].start();
            System.out.println("DaemonSpwan" + i+" started . ");
        }
        for(int i=0;i<t.length;i++){
            System.out.println("t["+i+"].isDaemon = "+t[i].isDaemon()+". ");

        }
        /**
         * 进入无限循环不停将控制权交给不同的线程,由于是后台线程,main线程结束之后后后台线程结束
         */
        while(true)
            Thread.yield();
    }

}


class DaemonSpawn implements Runnable{
    public void run(){
        while (true)
            Thread.yield();
    }
}

public class Daemons {
    public static void main(String[] args) throws Exception{
        Thread  d = new Thread(new Daemon());
        d.setDaemon(true);
        d.start();
        System.out.println("d.isDaemon = "+d.isDaemon()+". ");
        TimeUnit.SECONDS.sleep(1);

    }

}

1.9 编码的变体

package concurrency;

public class SimpleThread extends Thread{
    private int countDown = 5;
    private static int threadCount = 0;
    public SimpleThread() {
        //Store Thread name
        super(Integer.toString(++threadCount));
        start();
    }
    public String toString() {
        return "#" + getName() + "(" +countDown + ") .";//通过getName()获取
    }
    public void run(){
        while(true){
            System.out.println(this);
            if(--countDown == 0){
                return;
            }
        }

    }
    public static void main(String[] args){
        for(int i = 0;i < 5;i++){
            new SimpleThread();
        }
    }
}

image-20220218100118219

package concurrency;

public class selfManaged implements Runnable {
    private int countDown = 5;
    private Thread t = new Thread(this);
    public selfManaged(){
        t.start();
    }
    public String toString(){
        return  Thread.currentThread().getName() + "(" + countDown + "). ";
    }
    public void run(){
        while(true){
            System.out.println(this);
            if(--countDown == 0){
                return;
            }
        }
    }
    public static void main(String[] args){
        for(int i = 0;i<5;i++){
            new selfManaged();
        }
    }
}

image-20220218101148099

1.10 术语

线程并不是执行这样或者那样的动作,他只是驱动赋予他的任务,我们用术语“任务”,表示将要执行的动作

1.11 加入一个线程

一个线程可以在另一个线程之上调用join()方法,效果是等待一段时间直到第二个线程结束继续执行,

join也可被中断,调用线程上的interrupt,这时需要用到try,catch语句

package concurrency;

class Sleeper extends Thread{
    private int duration;
    public Sleeper(String name,int sleepTime){
        super(name);
        duration = sleepTime;
        start();
    }
    public void run(){
        try{
            sleep(duration);
        }catch(InterruptedException e){
            System.out.print(getName() + " was interrupted. "+ " isInterrupted(): " + isInterrupted());
            return;
        }
        System.out.println(getName() + " has awakended");
    }
}
class Joiner extends Thread{
    private Sleeper sleeper;
    public Joiner(String name,Sleeper sleeper){
        super(name);
        this.sleeper = sleeper;
        start();
    }
    public void run(){
        try{
            sleeper.join();
        }catch(InterruptedException e){
            System.out.println("Interrupted");
        }
        System.out.println(getName() + " join completed");
    }
}
public class Joining {
    public static void main(String[] args){
        Sleeper sleepy = new Sleeper("Sleepy",1500),
        grumpy = new Sleeper("Grumpy",1500);
        Joiner doepy = new Joiner("Doepy",sleepy),doc = new Joiner("Doc",grumpy);
        grumpy.interrupted();
    }
}

Joiner线程通过Sleeper对象上调用join方法等待Sleeper醒来,在main里面每一个sleeper都有一个Joiner,如果sleeper被中断或者是正常结束,Joiner将和sleeper一同结束

![image-20220220162041477](/Users/liuguangquan/Library/Application Support/typora-user-images/image-20220220162041477.png)

1.12 捕获异常

由于线程本质的特征,使得不能捕获从线程中逃逸的异常,一旦异常逃出任务的run方法,他就会向外传播到控制台,除非采用特殊的步骤捕获这种异常

采用Executor可以捕获异常,该异常通常会被传播到run方法的内部

Thread.UncaughtException-Handler允许在一个Thread对象上都附着一个异常处理器。Thread.UncaughtExceptionHandler.uncaughtException()会在线程因未捕获异常而临近死亡时调用

package concurrency;

import java.util.concurrent.*;

class ExceptionThread2 implements Runnable{
    public void run(){
        Thread t = Thread.currentThread();
        System.out.println("run() by "+ t);
        System.out.println("eh = "+t.getUncaughtExceptionHandler());
        throw new RuntimeException();
    }
}
class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
    public void uncaughtException(Thread t,Throwable e){
        System.out.println("caught "+e);
    }
}
class HandlerThreadFactory implements ThreadFactory{
    public Thread newThread(Runnable r){
        System.out.println(this + " creating new Thread");
        Thread t = new Thread(r);
        System.out.println("created "+ t);
        t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
        System.out.println("eh= "+t.getUncaughtExceptionHandler());
        return t;
    }
}
public class CaptureUncaughtException {
    public static void main(String[] args){
        ExecutorService exec = Executors.newCachedThreadPool(new HandlerThreadFactory());
        exec.execute(new ExceptionThread2());
    }
}

image-20220221124934768

2. 共享受限资源

并发是多个线程同时搞事情,如果多个线程同时访问一个资源会造成很明显的冲突,比如访问同一个银行账户,向同一个打印机打印

2.1 不正确的访问资源


文章作者: 尘落
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 尘落 !
评论
  目录