线程和线程池


异步任务的方式


new Thread 、AsyncTask 、HandlerThread 、IntentService 、ThreadPoolExecutor。

AsyncTask:

static class MyAsyncTask extends AsyncTask<String,Integer,String>{
    @Override
    protected String doInBackground(String... params) {
        for (int i = 0; i < 10; i++) {
            publishProgress(i*10);
        }
        return params[0];
    }

    @Override
    protected void onProgressUpdate(Integer... values) {//主线程回调
        Log.e(TAG, "onProgressUpdate: " + values[0]);
    }

    @Override
    protected void onPostExecute(String result) {//执行结果
        Log.e(TAG, "onPostExecute: " + result);
    }
}

MyAsyncTask asyncTask = new MyAsyncTask();
asyncTask.execute("execute mytask");

//方式二:串联执行,如果其中有一条任务休眠了,或者执行时间过长,后面任务将会被阻塞
AsyncTask.execute(new Runnable() {
    @Override
    public void run() {
        Log.e(TAG, "run: AsyncTask execute");
    }
});

//方式三:并发任务执行
AsyncTask.THREAD_POOL_EXECUTOR.execute(new Runnable() {
    @Override
    public void run() {
        Log.e(TAG, "run: THREAD_POOL_EXECUTOR AsyncTask execute");
    }
});

HandlerThread:

static class MyHandler extends Handler{
    @Override
    public void handleMessage(@NonNull Message msg) {
        super.handleMessage(msg);
        Log.e(TAG, "handleMessage: " + msg.what);
        Log.e(TAG, "handleMessage: " + Thread.currentThread().getName());
    }

    public MyHandler(@NonNull Looper looper) {
        super(looper);
    }
}

//1、先开启handlerThread-适用于主线程需要和子线程通信的场景,应用于持续性任务,比如轮训,
HandlerThread handlerThread=new HandlerThread("handler-thread");
handlerThread.start();
MyHandler myHandler= new MyHandler(handlerThread.getLooper());
myHandler.sendEmptyMessage(MSG_WHAT_1);//主线程向子线程handler-thread发送消息

//只能quit
//handlerThread.quit();
//handlerThread.quitSafely();

手写主线程向子线程发送消息:

class LooperThread extends Thread {
        private Looper looper;

        public LooperThread(String name) {
            super(name);
        }

        public Looper getLooper() {
            synchronized (this){
                if (looper==null&&isAlive()) {
                    try {
                        wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            return looper;
        }

        @Override
        public void run() {
            Looper.prepare();
            synchronized (this){
                looper=Looper.myLooper();
                notify();
            }
            Looper.loop();
        }
    }

LooperThread thread=new LooperThread("child-looper");
thread.start();

Handler handler = new Handler(thread.getLooper()) {
    @Override
    public void handleMessage(@NonNull Message msg) {
        super.handleMessage(msg);
        Log.e(TAG, "handleMessage: " + msg.what);
        Log.e(TAG, "handleMessage: " + Thread.currentThread().getName());
    }
};
handler.sendEmptyMessage(MSG_WHAT_1);

创建线程

两种方法:

  • 继承Thread类
  • 实现Runnable接口

共同点:都要用start()开启线程

创建子线程方式1:


Thread thread = new Thread() {
    @Override
    public void run() {
        //super.run();
        System.out.println("Thread started!");
    }
};
thread.start();

start调用了native方法。

或者

class MyThread1 extends Thread{
    @Override
    public void run() {
        super.run();
        System.out.println("MyThread1:"+Thread.currentThread().getName());
    }
}
new MyThread1().start();//创建并启动线程

创建子线程方式2:


Runnable runnable = new Runnable() {
    @Override
    public void run() {
        System.out.println("Thread with Runnable started!");
    }
};
Thread thread = new Thread(runnable);
thread.start();

或者

class MyThread2 implements Runnable{
    @Override
    public void run() {
        System.out.println("MyThread2:"+Thread.currentThread().getName());
    }
}

new Thread(new MyThread2()).start();//创建并启动线程

//或者
new Thread(new Runnable() {
    @Override
    public void run() {
        System.out.println("MyThread3:"+Thread.currentThread().getName());
    }
}).start();

ThreadFactory


ThreadFactory factory = new ThreadFactory() {
    AtomicInteger count = new AtomicInteger(0);
    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, "Thread-" + count.incrementAndGet());
    }
};

Runnable runnable = new Runnable() {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " started!");
    }
};

Thread thread = factory.newThread(runnable);
thread.start();
Thread thread1 = factory.newThread(runnable);
thread1.start();

Executors


Runnable runnable = new Runnable() {
    @Override
    public void run() {
        System.out.println("Thread with Runnable started!");
    }
};
Executor executor = Executors.newCachedThreadPool();
executor.execute(runnable);
executor.execute(runnable);
executor.execute(runnable);

除了newCachedThreadPool还有newSingleThreadExecutor单线程池、newFixedThreadPool指定固定线程数的线程池。

ExecutorService executor = Executors.newFixedThreadPool(20);
for (Bitmap bitmap : bitmaps) {
 	executor.execute(bitmapProcessor(bitmap));
}
executor.shutdown();

lock


同样是「加锁」机制。但使用方法更灵活,同时也更麻烦一些。

Lock lock = new ReentrantLock();
...
lock.lock();
try {
    x++;
} finally {
    lock.unlock();
}

一般并不会只是使用Lock,而是会使用更复杂的锁ReentrantReadWriteLock:

public class ReadWriteLockDemo {
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    Lock readLock = lock.readLock();
    Lock writeLock = lock.writeLock();
    private int x = 0;
    private void count() {
        writeLock.lock();
        try {
            x++;
        } finally {
            writeLock.unlock();
        }
    }

    private void print(int time) {
        readLock.lock();
        try {
            for (int i = 0; i < time; i++) {
                System.out.print(x + " ");
            }
            System.out.println();
        } finally {
            readLock.unlock();
        }
    }

    public void runTest() {
        new Thread() {
            @Override
            public void run() {
                for (int i = 0; i < 1_000_000; i++) {
                    count();
                }
                System.out.println("final x from 1: " + x);
            }
        }.start();
        new Thread() {
            @Override
            public void run() {
                print(1);
            }
        }.start();
        new Thread() {
            @Override
            public void run() {
                print(2);
            }
        }.start();
        new Thread() {
            @Override
            public void run() {
                print(3);
            }
        }.start();
        new Thread() {
            @Override
            public void run() {
                print(4);
            }
        }.start();
        new Thread() {
            @Override
            public void run() {
                print(5);
            }
        }.start();
    }
}

wait

在未达到目标时 wait();用 while 循环检查;设置完成后 notifyAll();wait() 和 notify() / notifyAll() 都需要放在同步代码块里。

public class WaitDemo {
    private String sharedString;
    private synchronized void initString() {
        sharedString = "kkk";
        notifyAll();
    }

    private synchronized void printString() {
        while (sharedString == null) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("String: " + sharedString);
    }

    public void runTest() {
        final Thread thread1 = new Thread() {
            @Override
            public void run() {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                printString();
            }
        };
        thread1.start();
        Thread thread2 = new Thread() {
            @Override
            public void run() {
                Thread.yield();
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                initString();
            }
        };
        thread2.start();
    }
}

其他

Service:后台任务的活动空间。适用场景:音乐播放器等。IntentService:执行单个任务后自动关闭的 Service。

如果在界面组件里创建 Executor 或者 HandlerThread,记得要在关闭的时候(例如Activity.onDestroy() )关闭 Executor 和 HandlerThread。

@Override
protected void onDestroy() {
 super.onDestroy();
 executor.shutdown();
}

@Override
protected void onDestroy() {
 super.onDestroy();
 handlerThread.quit(); // 这个其实就是停⽌ Looper 的循环
}

线程安全


synchronized


synchronized 的本质:保证方法内部或代码块内部资源(数据)的互斥访问。即同一时间、由同一个Monitor监视的代码,最多只能有一个线程在访问

synchronized定义在方法上,Monitor默认指定的是这个类;方法中使用synchronized(this),Monitor指定的也是这个类;如果想更换Monitor,就要把this换成其他对象。即多个线程,访问同一个类的多个方法,就要处理Monitor是不是同一个。

如果线程1正在访问count方法,那么线程2无法访问到minus方法,因为两个方法是同一个Monitor:

private synchronized void count(int newValue) {
    x = newValue;
    y = newValue;
}

private void minus(int delta) {
    synchronized (this) {
        x -= delta;
        y -= delta;
    }
}

如果线程1正在访问count方法,线程2可以访问到minus方法,因为两个方法不是同一个Monitor,是两个Monitor:

private final Object monitor1 = new Object();
private synchronized void count(int newValue) {
    x = newValue;
    y = newValue;
}
private void minus(int delta) {
    synchronized (monitor1) {
        x -= delta;
        y -= delta;
    }
}

保证线程之间对监视资源的数据同步。即,任何线程在获取到 Monitor 后的第一时间,会先将共享内存中的数据复制到自己的缓存中;任何线程在释放 Monitor 的第一时间,会先将缓存中的数据复制到共享内存中。

如果 synchronized加在代码块上面,为获取到对象锁的线程,可以访问同步代码块之外的代码。

如果synchronized加载方法上,为获取到对象锁的线程,只能排队,不能访问。

加在static 方法上面,就相对是给Class对象加锁,由于在jvm中只会存在一份class对象。所以此时无论是不是同一个java对象,去访问同步访问,都只能排队。


volatile


保证加了 volatile 关键字的字段的操作具有原子性和同步性,其中原子性相当于实现了针对单一字段的线程间互斥访问。因此 volatile 可以看做是简化版的 synchronized。

volatile 只对基本类型 (byte、char、short、int、long、float、double、boolean) 的赋值操作和对象的引用赋值操作有效。

原子性是cup执行的最小单位。

注意:count ++count= count + 1不具有原子性。参考下面对比。


atomic


java.util.concurrent.atomic 包:

下面有 AtomicInteger AtomicBoolean 等类,作用和 volatile 基本一致,可以看做是通用版的 volatile。

AtomicInteger atomicInteger = new AtomicInteger(0);
...
atomicInteger.getAndIncrement();

volatile和atomic对比


public class AtomicDemo {
    public static void main(String[] args) throws InterruptedException {
        AtomicTask task=new AtomicTask();
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10000; i++) {
                    task.incrementVolatile();
                    task.incrementAtomic();
                }
            }
        };
        Thread t1 = new Thread(runnable);
        Thread t2 = new Thread(runnable);
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println("\n"+task.atomicInteger.get()+"\n"+task.volatileCount);
    }
    static class AtomicTask{
        AtomicInteger atomicInteger=new AtomicInteger();
        volatile int volatileCount=0;
        void incrementAtomic(){
            atomicInteger.getAndIncrement();
        }
        void incrementVolatile(){
            // volatile修饰的成员变量在每次被线程访问时,都从共享内存重新读取该成员的值,
            // 而当成员变量值发生变化时,将变化的值重新写入共享内存
            volatileCount++;
            //不能解决非原子操作的线程安全性。性能不及原子类高
        }
    }
}

结果:atomicInteger值为20000,而volatileCount值不是20000。


ReentrantLock


基本用法:

ReentrantLock reentrantLock = new ReentrantLock();
try {
    reentrantLock.lock();
    
} catch (Exception e) {
    e.printStackTrace();
} finally {
    reentrantLock.unlock();
}

ReentrantLock可以重复调用lock,不会发生死锁,比如递归。

公平锁和非公平锁:

//公平锁,即进入阻塞的线程依次排队去获取锁,均有机会执行
//默认是非公平锁 后加入的线程,尝试插队一次去获取锁,若获取到直接执行,不用进入阻塞。
// 允许线程插队,避免每个线程都阻塞再唤醒的资源消耗。但也有可能线程饿死(即一直得不到锁无法执行)
ReentrantLock lock = new ReentrantLock(true/false);

condition 条件对象:

    Condition condition = lock.newCondition();

ReadLock和WriteLock:

    ReentrantReadWriteLock lock=new ReentrantReadWriteLock();

    ReentrantReadWriteLock.ReadLock readLock = lock.readLock();//共享锁;
    ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();//排他锁

线程池ThreadPoolExecutor

构造方法参数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
  • corePoolSize:线程基本大小
  • maximumPoolSize:线程池最大所能容纳线程大小
  • keepAliveTime:线程活动保持时间
  • workQueue:阻塞队列
  • threadFactory创建线程的工厂
  • handler:饱和策略

手动创建线程池

仿照AsyncTask源码,写个线程池。缓存队列Runnable,加入到线程队列中去执行。

public class ThreadPoolDemo {

    static ThreadPoolExecutor threadPoolExecutor;

    private static final BlockingQueue<Runnable> sPoolWorkQueue = new LinkedBlockingQueue<>(128);

    static {
        threadPoolExecutor = new ThreadPoolExecutor(
                4,// 核心线程数,就是线程池里面的核心线程数量
                10, // 最大线程数,线程池中的最大线程数
                60,// 线程存活的时间,没事干的时候的空闲存活时间,超过这个时间线程就会被销毁
                TimeUnit.SECONDS,// 线程存活时间的单位
                sPoolWorkQueue,// 线程队列
                new ThreadFactory() {//线程创建工厂,如果线程池需要创建线程就会调用newThread 来创建
                    @Override
                    public Thread newThread(@NonNull Runnable r) {
//                        Thread thread = new Thread(r,"自己线程的名字");
//                        thread.setDaemon(false); // 不是守护线程
                        return new Thread(r);
                    }
                });
    }

    public static void main(String[] args) {
        for (int i = 0; i < 20; i++) {
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("下载图片显示完毕"+Thread.currentThread().getName());
                }
            };
            // 加入线程队列,寻找合适的时机去执行
            threadPoolExecutor.execute(runnable);
        }
    }
}

输出结果:

Runnable缓存队列大于WorkQueue线程队列就可能会报错。

workQueue的参数

BlockingQueue: 先进先出的一个队列 FIFO(rxjava,AsyncTask)
SynchronousQueue: 线程安全的队列,它里面是没有固定的缓存的(OKHttp所使用的)
PriorityBlockingQueue: 无序的可以根据优先级进行排序,指定的对象要实现Comparable作比较

有限队列SynchronousQueueArrayBlockingQueue;无限队列LinkedBloackingQueue

PriorityBlockingQueue示例

public class Request implements Runnable,Comparable<Request>{
    @Override
    public void run() {
        System.out.println("run");
    }

    @Override
    public int compareTo(@NonNull Request o) {
        return 0;
    }
}

public class ThreadPoolTest {
    static ThreadPoolExecutor threadPoolExecutor;

    private static final BlockingQueue<Runnable> sPoolWorkQueue = new PriorityBlockingQueue<Runnable>(4);

    static {
        threadPoolExecutor = new ThreadPoolExecutor(
                4,// 核心线程数,就是线程池里面的核心线程数量
                10, // 最大线程数,线程池中的最大线程数
                60,// 线程存活的时间,没事干的时候的空闲存活时间,超过这个时间线程就会被销毁
                TimeUnit.SECONDS,// 线程存活时间的单位
                sPoolWorkQueue,// 线程队列
                new ThreadFactory() {// 线程创建工厂,如果线程池需要创建线程就会调用 newThread 来创建
                    @Override
                    public Thread newThread(@NonNull Runnable r) {
                        return new Thread(r);
                    }
                });
    }

    public static void main(String[] args) {
        for (int i = 0; i < 20; i++) {
            Request request = new Request();
            // 加入线程队列,寻找合适的时机去执行
            threadPoolExecutor.execute(request);
        }
    }
}

协程


协程是一种解决方案,是一种解决嵌套,并发,弱化线程概念的方案。能让多个任务之间更好的协作,能够以同步的方式编排代码完成异步工作。将异步代码写的像同步代码一样直观。

引入

//在kotlin项目配合jetpack架构项目中引入协程
implementation  'androidx.lifecycle:lifecycle-viewmodel-ktx:2.2.0'
implementation 'androidx.lifecycle:lifecycle-runtime-ktx:2.2.0'
implementation 'androidx.lifecycle:lifecycle-livedata-ktx:2.2.0'

//在kotlin项目但非jetpack架构项目中引入
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.2.1'
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.1.1'

startScene1:线程1、2、3依次按顺序,在主线程执行,最后执行updateUI,startScene2:线程1在主线程执行,然后同步执行子线程2、3,最后执行updateUI:

import android.util.Log
import kotlinx.coroutines.*

object CoroutineScene {
    private val TAG: String = "CoroutineScene"
    fun startScene1() {
        GlobalScope.launch(Dispatchers.Main) {
            Log.e(TAG, "coroutine is running")
            val result1 = request1()
            val result2 = request2(result1)
            val result3 = request3(result2)
            updateUI(result3)
        }
        Log.e(TAG, "coroutine has launched")
    }
    fun startScene2(){
        GlobalScope.launch(Dispatchers.Main){
            Log.e(TAG, "coroutine is running")
            val result1 = request1()
            val deferred2 = GlobalScope.async { request2(result1) }
            val deferred3 = GlobalScope.async { request3(result1) }
            updateUI(deferred2.await(),deferred3.await())
        }
        Log.e(TAG, "coroutine has launched")
    }
    private fun updateUI(result2: String, result3: String) {
        Log.e(TAG, "updateui work on ${Thread.currentThread().name}")
        Log.e(TAG, "paramter:" + result3 + "---" + result2)
    }

    private fun updateUI(result3: String) {
        Log.e(TAG, "updateui work on ${Thread.currentThread().name}")
        Log.e(TAG, "paramter:" + result3)
    }

    suspend fun request1(): String {
        //不会暂停线程,但会暂停当前所在的协程
        delay(2 * 1000)
        //Thread.sleep(2000)  让线程休眠
        Log.e(TAG, "request1 work on ${Thread.currentThread().name}")
        return "result from request1"
    }

    suspend fun request2(result1: String): String {
        delay(2 * 1000)
        Log.e(TAG, "request2 work on ${Thread.currentThread().name}")
        return "result from request2"
    }

    suspend fun request3(result2: String): String {
        delay(2 * 1000)
        Log.e(TAG, "request3 work on ${Thread.currentThread().name}")
        return "result from request3"
    }
}

协程的核心是挂起和恢复,挂起是方法的挂起,方法的挂起本质是 return ,恢复是方法的恢复,恢复的本质是 callback回调:

//kotlin源代码
suspend fun request2(): String {
    delay(2 * 1000)
    Log.e(TAG, "request2 completed")
    return "result from request2"
}

//手写实现kotlin编译后代码
import android.util.Log;
import androidx.annotation.NonNull;
import org.jetbrains.annotations.NotNull;
import kotlin.coroutines.Continuation;
import kotlin.coroutines.CoroutineContext;
import kotlin.coroutines.intrinsics.IntrinsicsKt;
import kotlinx.coroutines.DelayKt;

public class CoroutineScene2_decompiled {
    private static final String TAG = "CoroutineScene2";
    public static final Object request2(Continuation preCallback){
        ContinuationImpl request2CallBack;
        if (!(preCallback instanceof ContinuationImpl)||(((ContinuationImpl) preCallback).label&Integer.MIN_VALUE)==0){
            request2CallBack=new ContinuationImpl(preCallback) {
                @Override
                Object invokeSuspend(@NonNull Object resumeResult) {
                    this.result=resumeResult;
                    this.label|=Integer.MIN_VALUE;
                    return request2(this);
                }
            };
        }else {
            request2CallBack= (ContinuationImpl) preCallback;
        }
        switch (request2CallBack.label) {
            case 0:{
                Object delay = DelayKt.delay(2000, request2CallBack);
                if (delay== IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
                    return IntrinsicsKt.getCOROUTINE_SUSPENDED();
                }
            }
        }
        Log.e(TAG, "request1 completed");

        return "result from request2";
    }

    static abstract class ContinuationImpl<T>implements Continuation<T>{
        private Continuation preCallback;
        int label;
        Object result;

        public ContinuationImpl(Continuation preCallback) {
            this.preCallback = preCallback;
        }

        @NotNull
        @Override
        public CoroutineContext getContext() {
            return preCallback.getContext();
        }

        @Override
        public void resumeWith(@NotNull Object resumeResult) {
            Object suspend = invokeSuspend(resumeResult);
            if (suspend== IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
                return;
            }
            preCallback.resumeWith(suspend);
        }
        abstract Object invokeSuspend(@NonNull Object resumeResult);
    }
}

lifecycleScope

countDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。

Semaphore 通常我们叫它信号量, 可以用来控制同时访问特定资源的线程数量。



文章作者:
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 !
  目录