异步任务的方式
new Thread 、AsyncTask 、HandlerThread 、IntentService 、ThreadPoolExecutor。
AsyncTask:
static class MyAsyncTask extends AsyncTask<String,Integer,String>{
@Override
protected String doInBackground(String... params) {
for (int i = 0; i < 10; i++) {
publishProgress(i*10);
}
return params[0];
}
@Override
protected void onProgressUpdate(Integer... values) {//主线程回调
Log.e(TAG, "onProgressUpdate: " + values[0]);
}
@Override
protected void onPostExecute(String result) {//执行结果
Log.e(TAG, "onPostExecute: " + result);
}
}
MyAsyncTask asyncTask = new MyAsyncTask();
asyncTask.execute("execute mytask");
//方式二:串联执行,如果其中有一条任务休眠了,或者执行时间过长,后面任务将会被阻塞
AsyncTask.execute(new Runnable() {
@Override
public void run() {
Log.e(TAG, "run: AsyncTask execute");
}
});
//方式三:并发任务执行
AsyncTask.THREAD_POOL_EXECUTOR.execute(new Runnable() {
@Override
public void run() {
Log.e(TAG, "run: THREAD_POOL_EXECUTOR AsyncTask execute");
}
});
HandlerThread:
static class MyHandler extends Handler{
@Override
public void handleMessage(@NonNull Message msg) {
super.handleMessage(msg);
Log.e(TAG, "handleMessage: " + msg.what);
Log.e(TAG, "handleMessage: " + Thread.currentThread().getName());
}
public MyHandler(@NonNull Looper looper) {
super(looper);
}
}
//1、先开启handlerThread-适用于主线程需要和子线程通信的场景,应用于持续性任务,比如轮训,
HandlerThread handlerThread=new HandlerThread("handler-thread");
handlerThread.start();
MyHandler myHandler= new MyHandler(handlerThread.getLooper());
myHandler.sendEmptyMessage(MSG_WHAT_1);//主线程向子线程handler-thread发送消息
//只能quit
//handlerThread.quit();
//handlerThread.quitSafely();
手写主线程向子线程发送消息:
class LooperThread extends Thread {
private Looper looper;
public LooperThread(String name) {
super(name);
}
public Looper getLooper() {
synchronized (this){
if (looper==null&&isAlive()) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
return looper;
}
@Override
public void run() {
Looper.prepare();
synchronized (this){
looper=Looper.myLooper();
notify();
}
Looper.loop();
}
}
LooperThread thread=new LooperThread("child-looper");
thread.start();
Handler handler = new Handler(thread.getLooper()) {
@Override
public void handleMessage(@NonNull Message msg) {
super.handleMessage(msg);
Log.e(TAG, "handleMessage: " + msg.what);
Log.e(TAG, "handleMessage: " + Thread.currentThread().getName());
}
};
handler.sendEmptyMessage(MSG_WHAT_1);
创建线程
两种方法:
- 继承Thread类
- 实现Runnable接口
共同点:都要用start()开启线程
创建子线程方式1:
Thread thread = new Thread() {
@Override
public void run() {
//super.run();
System.out.println("Thread started!");
}
};
thread.start();
start调用了native方法。
或者
class MyThread1 extends Thread{
@Override
public void run() {
super.run();
System.out.println("MyThread1:"+Thread.currentThread().getName());
}
}
new MyThread1().start();//创建并启动线程
创建子线程方式2:
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("Thread with Runnable started!");
}
};
Thread thread = new Thread(runnable);
thread.start();
或者
class MyThread2 implements Runnable{
@Override
public void run() {
System.out.println("MyThread2:"+Thread.currentThread().getName());
}
}
new Thread(new MyThread2()).start();//创建并启动线程
//或者
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("MyThread3:"+Thread.currentThread().getName());
}
}).start();
ThreadFactory
ThreadFactory factory = new ThreadFactory() {
AtomicInteger count = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "Thread-" + count.incrementAndGet());
}
};
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " started!");
}
};
Thread thread = factory.newThread(runnable);
thread.start();
Thread thread1 = factory.newThread(runnable);
thread1.start();
Executors
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("Thread with Runnable started!");
}
};
Executor executor = Executors.newCachedThreadPool();
executor.execute(runnable);
executor.execute(runnable);
executor.execute(runnable);
除了newCachedThreadPool还有newSingleThreadExecutor单线程池、newFixedThreadPool指定固定线程数的线程池。
ExecutorService executor = Executors.newFixedThreadPool(20);
for (Bitmap bitmap : bitmaps) {
executor.execute(bitmapProcessor(bitmap));
}
executor.shutdown();
lock
同样是「加锁」机制。但使用方法更灵活,同时也更麻烦一些。
Lock lock = new ReentrantLock();
...
lock.lock();
try {
x++;
} finally {
lock.unlock();
}
一般并不会只是使用Lock,而是会使用更复杂的锁ReentrantReadWriteLock:
public class ReadWriteLockDemo {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
Lock readLock = lock.readLock();
Lock writeLock = lock.writeLock();
private int x = 0;
private void count() {
writeLock.lock();
try {
x++;
} finally {
writeLock.unlock();
}
}
private void print(int time) {
readLock.lock();
try {
for (int i = 0; i < time; i++) {
System.out.print(x + " ");
}
System.out.println();
} finally {
readLock.unlock();
}
}
public void runTest() {
new Thread() {
@Override
public void run() {
for (int i = 0; i < 1_000_000; i++) {
count();
}
System.out.println("final x from 1: " + x);
}
}.start();
new Thread() {
@Override
public void run() {
print(1);
}
}.start();
new Thread() {
@Override
public void run() {
print(2);
}
}.start();
new Thread() {
@Override
public void run() {
print(3);
}
}.start();
new Thread() {
@Override
public void run() {
print(4);
}
}.start();
new Thread() {
@Override
public void run() {
print(5);
}
}.start();
}
}
wait
在未达到目标时 wait();用 while 循环检查;设置完成后 notifyAll();wait() 和 notify() / notifyAll() 都需要放在同步代码块里。
public class WaitDemo {
private String sharedString;
private synchronized void initString() {
sharedString = "kkk";
notifyAll();
}
private synchronized void printString() {
while (sharedString == null) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("String: " + sharedString);
}
public void runTest() {
final Thread thread1 = new Thread() {
@Override
public void run() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
printString();
}
};
thread1.start();
Thread thread2 = new Thread() {
@Override
public void run() {
Thread.yield();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
initString();
}
};
thread2.start();
}
}
其他
Service:后台任务的活动空间。适用场景:音乐播放器等。IntentService:执行单个任务后自动关闭的 Service。
如果在界面组件里创建 Executor 或者 HandlerThread,记得要在关闭的时候(例如Activity.onDestroy() )关闭 Executor 和 HandlerThread。
@Override
protected void onDestroy() {
super.onDestroy();
executor.shutdown();
}
@Override
protected void onDestroy() {
super.onDestroy();
handlerThread.quit(); // 这个其实就是停⽌ Looper 的循环
}
线程安全
synchronized
synchronized 的本质:保证方法内部或代码块内部资源(数据)的互斥访问。即同一时间、由同一个Monitor监视的代码,最多只能有一个线程在访问。
synchronized定义在方法上,Monitor默认指定的是这个类;方法中使用
synchronized(this)
,Monitor指定的也是这个类;如果想更换Monitor,就要把this换成其他对象。即多个线程,访问同一个类的多个方法,就要处理Monitor是不是同一个。
如果线程1正在访问count方法,那么线程2无法访问到minus方法,因为两个方法是同一个Monitor:
private synchronized void count(int newValue) {
x = newValue;
y = newValue;
}
private void minus(int delta) {
synchronized (this) {
x -= delta;
y -= delta;
}
}
如果线程1正在访问count方法,线程2可以访问到minus方法,因为两个方法不是同一个Monitor,是两个Monitor:
private final Object monitor1 = new Object();
private synchronized void count(int newValue) {
x = newValue;
y = newValue;
}
private void minus(int delta) {
synchronized (monitor1) {
x -= delta;
y -= delta;
}
}
保证线程之间对监视资源的数据同步。即,任何线程在获取到 Monitor 后的第一时间,会先将共享内存中的数据复制到自己的缓存中;任何线程在释放 Monitor 的第一时间,会先将缓存中的数据复制到共享内存中。
如果 synchronized加在代码块上面,为获取到对象锁的线程,可以访问同步代码块之外的代码。
如果synchronized加载方法上,为获取到对象锁的线程,只能排队,不能访问。
加在static 方法上面,就相对是给Class对象加锁,由于在jvm中只会存在一份class对象。所以此时无论是不是同一个java对象,去访问同步访问,都只能排队。
volatile
保证加了 volatile 关键字的字段的操作具有原子性和同步性,其中原子性相当于实现了针对单一字段的线程间互斥访问。因此 volatile 可以看做是简化版的 synchronized。
volatile 只对基本类型 (byte、char、short、int、long、float、double、boolean) 的赋值操作和对象的引用赋值操作有效。
原子性是cup执行的最小单位。
注意:count ++
和 count= count + 1
不具有原子性。参考下面对比。
atomic
java.util.concurrent.atomic 包:
下面有 AtomicInteger AtomicBoolean 等类,作用和 volatile 基本一致,可以看做是通用版的 volatile。
AtomicInteger atomicInteger = new AtomicInteger(0);
...
atomicInteger.getAndIncrement();
volatile和atomic对比
public class AtomicDemo {
public static void main(String[] args) throws InterruptedException {
AtomicTask task=new AtomicTask();
Runnable runnable = new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
task.incrementVolatile();
task.incrementAtomic();
}
}
};
Thread t1 = new Thread(runnable);
Thread t2 = new Thread(runnable);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("\n"+task.atomicInteger.get()+"\n"+task.volatileCount);
}
static class AtomicTask{
AtomicInteger atomicInteger=new AtomicInteger();
volatile int volatileCount=0;
void incrementAtomic(){
atomicInteger.getAndIncrement();
}
void incrementVolatile(){
// volatile修饰的成员变量在每次被线程访问时,都从共享内存重新读取该成员的值,
// 而当成员变量值发生变化时,将变化的值重新写入共享内存
volatileCount++;
//不能解决非原子操作的线程安全性。性能不及原子类高
}
}
}
结果:atomicInteger值为20000,而volatileCount值不是20000。
ReentrantLock
基本用法:
ReentrantLock reentrantLock = new ReentrantLock();
try {
reentrantLock.lock();
} catch (Exception e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
ReentrantLock可以重复调用lock,不会发生死锁,比如递归。
公平锁和非公平锁:
//公平锁,即进入阻塞的线程依次排队去获取锁,均有机会执行
//默认是非公平锁 后加入的线程,尝试插队一次去获取锁,若获取到直接执行,不用进入阻塞。
// 允许线程插队,避免每个线程都阻塞再唤醒的资源消耗。但也有可能线程饿死(即一直得不到锁无法执行)
ReentrantLock lock = new ReentrantLock(true/false);
condition 条件对象:
Condition condition = lock.newCondition();
ReadLock和WriteLock:
ReentrantReadWriteLock lock=new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = lock.readLock();//共享锁;
ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();//排他锁
线程池ThreadPoolExecutor
构造方法参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize:线程基本大小
- maximumPoolSize:线程池最大所能容纳线程大小
- keepAliveTime:线程活动保持时间
- workQueue:阻塞队列
- threadFactory创建线程的工厂
- handler:饱和策略
手动创建线程池
仿照AsyncTask源码,写个线程池。缓存队列Runnable,加入到线程队列中去执行。
public class ThreadPoolDemo {
static ThreadPoolExecutor threadPoolExecutor;
private static final BlockingQueue<Runnable> sPoolWorkQueue = new LinkedBlockingQueue<>(128);
static {
threadPoolExecutor = new ThreadPoolExecutor(
4,// 核心线程数,就是线程池里面的核心线程数量
10, // 最大线程数,线程池中的最大线程数
60,// 线程存活的时间,没事干的时候的空闲存活时间,超过这个时间线程就会被销毁
TimeUnit.SECONDS,// 线程存活时间的单位
sPoolWorkQueue,// 线程队列
new ThreadFactory() {//线程创建工厂,如果线程池需要创建线程就会调用newThread 来创建
@Override
public Thread newThread(@NonNull Runnable r) {
// Thread thread = new Thread(r,"自己线程的名字");
// thread.setDaemon(false); // 不是守护线程
return new Thread(r);
}
});
}
public static void main(String[] args) {
for (int i = 0; i < 20; i++) {
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("下载图片显示完毕"+Thread.currentThread().getName());
}
};
// 加入线程队列,寻找合适的时机去执行
threadPoolExecutor.execute(runnable);
}
}
}
输出结果:
Runnable缓存队列大于WorkQueue线程队列就可能会报错。
workQueue的参数
BlockingQueue: 先进先出的一个队列 FIFO(rxjava,AsyncTask)
SynchronousQueue: 线程安全的队列,它里面是没有固定的缓存的(OKHttp所使用的)
PriorityBlockingQueue: 无序的可以根据优先级进行排序,指定的对象要实现Comparable作比较
有限队列SynchronousQueue
和ArrayBlockingQueue
;无限队列LinkedBloackingQueue
。
PriorityBlockingQueue示例
public class Request implements Runnable,Comparable<Request>{
@Override
public void run() {
System.out.println("run");
}
@Override
public int compareTo(@NonNull Request o) {
return 0;
}
}
public class ThreadPoolTest {
static ThreadPoolExecutor threadPoolExecutor;
private static final BlockingQueue<Runnable> sPoolWorkQueue = new PriorityBlockingQueue<Runnable>(4);
static {
threadPoolExecutor = new ThreadPoolExecutor(
4,// 核心线程数,就是线程池里面的核心线程数量
10, // 最大线程数,线程池中的最大线程数
60,// 线程存活的时间,没事干的时候的空闲存活时间,超过这个时间线程就会被销毁
TimeUnit.SECONDS,// 线程存活时间的单位
sPoolWorkQueue,// 线程队列
new ThreadFactory() {// 线程创建工厂,如果线程池需要创建线程就会调用 newThread 来创建
@Override
public Thread newThread(@NonNull Runnable r) {
return new Thread(r);
}
});
}
public static void main(String[] args) {
for (int i = 0; i < 20; i++) {
Request request = new Request();
// 加入线程队列,寻找合适的时机去执行
threadPoolExecutor.execute(request);
}
}
}
协程
协程是一种解决方案,是一种解决嵌套,并发,弱化线程概念的方案。能让多个任务之间更好的协作,能够以同步的方式编排代码完成异步工作。将异步代码写的像同步代码一样直观。
引入
//在kotlin项目配合jetpack架构项目中引入协程
implementation 'androidx.lifecycle:lifecycle-viewmodel-ktx:2.2.0'
implementation 'androidx.lifecycle:lifecycle-runtime-ktx:2.2.0'
implementation 'androidx.lifecycle:lifecycle-livedata-ktx:2.2.0'
//在kotlin项目但非jetpack架构项目中引入
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.2.1'
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.1.1'
startScene1:线程1、2、3依次按顺序,在主线程执行,最后执行updateUI,startScene2:线程1在主线程执行,然后同步执行子线程2、3,最后执行updateUI:
import android.util.Log
import kotlinx.coroutines.*
object CoroutineScene {
private val TAG: String = "CoroutineScene"
fun startScene1() {
GlobalScope.launch(Dispatchers.Main) {
Log.e(TAG, "coroutine is running")
val result1 = request1()
val result2 = request2(result1)
val result3 = request3(result2)
updateUI(result3)
}
Log.e(TAG, "coroutine has launched")
}
fun startScene2(){
GlobalScope.launch(Dispatchers.Main){
Log.e(TAG, "coroutine is running")
val result1 = request1()
val deferred2 = GlobalScope.async { request2(result1) }
val deferred3 = GlobalScope.async { request3(result1) }
updateUI(deferred2.await(),deferred3.await())
}
Log.e(TAG, "coroutine has launched")
}
private fun updateUI(result2: String, result3: String) {
Log.e(TAG, "updateui work on ${Thread.currentThread().name}")
Log.e(TAG, "paramter:" + result3 + "---" + result2)
}
private fun updateUI(result3: String) {
Log.e(TAG, "updateui work on ${Thread.currentThread().name}")
Log.e(TAG, "paramter:" + result3)
}
suspend fun request1(): String {
//不会暂停线程,但会暂停当前所在的协程
delay(2 * 1000)
//Thread.sleep(2000) 让线程休眠
Log.e(TAG, "request1 work on ${Thread.currentThread().name}")
return "result from request1"
}
suspend fun request2(result1: String): String {
delay(2 * 1000)
Log.e(TAG, "request2 work on ${Thread.currentThread().name}")
return "result from request2"
}
suspend fun request3(result2: String): String {
delay(2 * 1000)
Log.e(TAG, "request3 work on ${Thread.currentThread().name}")
return "result from request3"
}
}
协程的核心是挂起和恢复,挂起是方法的挂起,方法的挂起本质是 return
,恢复是方法的恢复,恢复的本质是 callback
回调:
//kotlin源代码
suspend fun request2(): String {
delay(2 * 1000)
Log.e(TAG, "request2 completed")
return "result from request2"
}
//手写实现kotlin编译后代码
import android.util.Log;
import androidx.annotation.NonNull;
import org.jetbrains.annotations.NotNull;
import kotlin.coroutines.Continuation;
import kotlin.coroutines.CoroutineContext;
import kotlin.coroutines.intrinsics.IntrinsicsKt;
import kotlinx.coroutines.DelayKt;
public class CoroutineScene2_decompiled {
private static final String TAG = "CoroutineScene2";
public static final Object request2(Continuation preCallback){
ContinuationImpl request2CallBack;
if (!(preCallback instanceof ContinuationImpl)||(((ContinuationImpl) preCallback).label&Integer.MIN_VALUE)==0){
request2CallBack=new ContinuationImpl(preCallback) {
@Override
Object invokeSuspend(@NonNull Object resumeResult) {
this.result=resumeResult;
this.label|=Integer.MIN_VALUE;
return request2(this);
}
};
}else {
request2CallBack= (ContinuationImpl) preCallback;
}
switch (request2CallBack.label) {
case 0:{
Object delay = DelayKt.delay(2000, request2CallBack);
if (delay== IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
return IntrinsicsKt.getCOROUTINE_SUSPENDED();
}
}
}
Log.e(TAG, "request1 completed");
return "result from request2";
}
static abstract class ContinuationImpl<T>implements Continuation<T>{
private Continuation preCallback;
int label;
Object result;
public ContinuationImpl(Continuation preCallback) {
this.preCallback = preCallback;
}
@NotNull
@Override
public CoroutineContext getContext() {
return preCallback.getContext();
}
@Override
public void resumeWith(@NotNull Object resumeResult) {
Object suspend = invokeSuspend(resumeResult);
if (suspend== IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
return;
}
preCallback.resumeWith(suspend);
}
abstract Object invokeSuspend(@NonNull Object resumeResult);
}
}
lifecycleScope
countDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。
Semaphore 通常我们叫它信号量, 可以用来控制同时访问特定资源的线程数量。