Java并发应用 生产者与消费者模型 synchronize 基于synchronize 方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class Solution { public synchronized void produce () throws InterruptedException { while (present) { wait(); } System.out.println(Thread.currentThread() + " Producer produce meal " + count); count++; this .setPresent(true ); notifyAll(); } public synchronized void consume () throws InterruptedException { while (!present) { wait(); } System.out.println(Thread.currentThread() + " consumer present meal" + count); this .setPresent(false ); notifyAll(); } }
使用synchronize 锁对象 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class Solution { public void createByObject () throws InterruptedException { synchronized (this ) { while (present) { wait(); } } count++; System.out.println(Thread.currentThread() + " Producer produce meal " + count); this .setPresent(true ); synchronized (this ) { notifyAll(); } } public void consumerByObject () throws InterruptedException { synchronized (this ) { while (!present) { wait(); } } System.out.println(Thread.currentThread() + " consumer present meal " + count); this .setPresent(false ); synchronized (this ) { notifyAll(); } } }
问题代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class Solution { public void createByObject () throws InterruptedException { while (present) { synchronized (this ) { wait(); } } count++; System.out.println(Thread.currentThread() + " Producer produce meal " + count); this .setPresent(true ); synchronized (this ) { notifyAll(); } } public void consumerByObject () throws InterruptedException { while (!present) { synchronized (this ) { wait(); } } System.out.println(Thread.currentThread() + " consumer present meal " + count); this .setPresent(false ); synchronized (this ) { notifyAll(); } } public void createByObject () throws InterruptedException { synchronized (this ) { while (present) { wait(); } } } }
基于ReentrantLock 结合 condition
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 class Consumer implements Runnable { private ReentrantLock lock; private Condition condition; public Consumer (ReentrantLock lock, Condition condition) { this .lock = lock; this .condition = condition; } @Override public void run () { try { while (!Thread.interrupted()) { try { lock.lock(); while (!ProConDemo.flag) { condition.await(); } System.out.println(Thread.currentThread() + " consumer shout !!!!" ); ProConDemo.flag = false ; condition.signalAll(); } finally { lock.unlock(); } } } catch (InterruptedException e) { e.printStackTrace(); } } }class Producer implements Runnable { private ReentrantLock lock; private Condition condition; public Producer (ReentrantLock lock, Condition condition) { this .lock = lock; this .condition = condition; } @Override public void run () { try { while (!Thread.interrupted()) { try { lock.lock(); while (ProConDemo.flag) { condition.await(); } System.out.println(Thread.currentThread() + " producer shout~~~~" ); ProConDemo.flag = true ; condition.signalAll(); } finally { lock.unlock(); } } } catch (InterruptedException e) { e.printStackTrace(); } } }
基于BlockingQueue 使用阻塞队列实现生产者与消费者模型
消费者:queue.take();
生产者:queue.put(obj)
多线程顺序输出 Synchronize锁对象实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 public class SynchronizeObject { static boolean flag = true ; public static void main (String[] args) throws InterruptedException { Object obj = new Object (); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor (10 , 20 , 10 , TimeUnit.SECONDS, new LinkedBlockingQueue <>(10 )); threadPoolExecutor.execute(new Thread1 (obj)); threadPoolExecutor.execute(new Thread2 (obj)); TimeUnit.SECONDS.sleep(3 ); threadPoolExecutor.shutdown(); } }class Thread1 implements Runnable { private final Object obj; public Thread1 (Object obj) { this .obj = obj; } @Override public void run () { try { while (!Thread.interrupted()) { synchronized (obj) { while (!SynchronizeObject.flag) { obj.wait(); } System.out.println(Thread.currentThread() + " this is thread1" ); SynchronizeObject.flag = false ; obj.notify(); } } } catch (InterruptedException e) { e.printStackTrace(); } } }class Thread2 implements Runnable { private final Object obj; public Thread2 (Object obj) { this .obj = obj; } @Override public void run () { try { while (!Thread.interrupted()) { synchronized (obj) { while (SynchronizeObject.flag) { obj.wait(); } System.out.println(Thread.currentThread() + " this is thread2~~" ); SynchronizeObject.flag = true ; obj.notify(); } } } catch (InterruptedException e) { e.printStackTrace(); } } }
公平锁实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Slf4j public class CorrectOrderLock { public static void main (String[] args) throws InterruptedException { ReentrantLock lock = new ReentrantLock (true ); new Thread (createRunnable(lock, "A" )).start(); new Thread (createRunnable(lock, "B" )).start(); new Thread (createRunnable(lock, "C" )).start(); TimeUnit.SECONDS.sleep(3 ); } private static Runnable createRunnable (ReentrantLock lock, String msg) { return () -> { while (!Thread.currentThread().isInterrupted()) { try { lock.lock(); log.info(msg); } finally { lock.unlock(); } } }; } }
阻塞队列实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class BlockingQueueOrder { public static void main (String[] args) throws InterruptedException { BlockingQueue<Object> queue1 = new LinkedBlockingDeque <>(); BlockingQueue<Object> queue2 = new LinkedBlockingDeque <>(); BlockingQueue<Object> queue3 = new LinkedBlockingDeque <>(); new Thread (createRunnable(queue1, queue2, "A" )).start(); new Thread (createRunnable(queue2, queue3, "B" )).start(); new Thread (createRunnable(queue3, queue1, "C" )).start(); queue1.offer(new Object ()); new CountDownLatch (1 ).await(); } private static Runnable createRunnable (BlockingQueue<Object> consumerQueue, BlockingQueue<Object> producerQueue, String msg) { return () -> { while (!Thread.currentThread().isInterrupted()) { try { consumerQueue.take(); System.out.println(msg); producerQueue.offer(new Object ()); } catch (InterruptedException e) { e.printStackTrace(); } } }; } }
条件队列实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 @Slf4j public class ConditionOrder { private static final ReentrantLock lock = new ReentrantLock (); private static final Condition condition1 = lock.newCondition(); private static final Condition condition2 = lock.newCondition(); private static final Condition condition3 = lock.newCondition(); public static void main (String[] args) throws InterruptedException { Thread thread1 = new Thread (createRunnable(condition1, condition2, "A" )); Thread thread2 = new Thread (createRunnable(condition2, condition3, "B" )); Thread thread3 = new Thread (createRunnable(condition3, condition1, "C" )); thread1.start(); thread2.start(); thread3.start(); TimeUnit.SECONDS.sleep(1 ); lock.lock(); condition1.signal(); lock.unlock(); TimeUnit.SECONDS.sleep(3 ); } private static Runnable createRunnable (Condition waitCondition, Condition signalCondition, String msg) { return () -> { try { lock.lock(); while (!Thread.currentThread().isInterrupted()) { waitCondition.await(); log.info(msg); signalCondition.signal(); } } catch (Exception e) { Thread.currentThread().interrupt(); log.error("error:" , e); } finally { lock.unlock(); } }; } }
使用String常量作为synchronized的锁 优化同步锁 String是final的,每次对它的操作都会产生新的String,这很大程度上是安全性的考虑,但是产生大量的String也是会有一些问题的。
大量的String会对gc产生影响;
两次 new String(“aa”)操作,产生的String不一样,如果用这两个去做synchronized(String)操作就达不到想要的效果,因为synchronized必须是对同一个对象进行加锁才有效果。
以下为demo:
synchronized (lock)
输出为乱序
synchronized (lock.intern())
输出为顺序
synchronized (pool.intern(lock))
输出为顺序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class StringSynchronized { public static void main (String[] args) throws InterruptedException { Interner<String> pool = Interners.newWeakInterner(); for (int i = 1 ; i < 10 ; i++) { TestString billno123 = new TestString ("billNo:123123123" , i, pool); Thread thread = new Thread (billno123); thread.start(); } TimeUnit.SECONDS.sleep(1 ); System.out.println("finish" ); } }@Slf4j @AllArgsConstructor @Data class TestString implements Runnable { private final String lock; private int workingNo; private Interner<String> pool; @Override public void run () { synchronized (pool.intern(lock)) { log.info(lock + " ==>" + workingNo); } } }
区别是:
interns
常量池有限,存储在hashtable中,数据多了之后,碰撞厉害,而且容易加重full gc负担
Interners
内部基于ConcurrentHashMap实现,而且可以设置引用类型,不会加重full gc负担,但有一个问题就是如果gc回收了存储在Interners里面的String,那么pool.intern(lock) 可能也会返回不同的引用,总之,还是建议使用Interners,效率和内存使用率更高
类加载中synchronized的应用 类加载过程中为了控制并发的情况,使用synchronized控制。而synchronized控制并发主要使用锁对象的方式
使用ConcurrentHashMap存储加锁的对象。
获取锁对象的时候,先去ConcurrentHashMap获取对象,由于ConcurrentHashMap添加操作是并发安全的,最后保证不同线程加锁的时候获取到同一个对象。
synchronized加锁后,进入到类加载流程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public class ClassLoader { private final ConcurrentHashMap<String, Object> parallelLockMap; protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { synchronized (getClassLoadingLock(name)) { Class<?> c = findLoadedClass(name); if (c == null ) { long t0 = System.nanoTime(); try { if (parent != null ) { c = parent.loadClass(name, false ); } else { c = findBootstrapClassOrNull(name); } } catch (ClassNotFoundException e) { } if (c == null ) { long t1 = System.nanoTime(); c = findClass(name); sun.misc.PerfCounter.getParentDelegationTime().addTime(t1 - t0); sun.misc.PerfCounter.getFindClassTime().addElapsedTimeFrom(t1); sun.misc.PerfCounter.getFindClasses().increment(); } } if (resolve) { resolveClass(c); } return c; } } protected Object getClassLoadingLock (String className) { Object lock = this ; if (parallelLockMap != null ) { Object newLock = new Object (); lock = parallelLockMap.putIfAbsent(className, newLock); if (lock == null ) { lock = newLock; } } return lock; } }
单订单重复退款请求
synchronize修饰退款方法。
缩小synchronize锁范围,使用对象锁。对象锁,创建弱引用的一个订单ID对象,放到统一的锁对象资源池中。
清理锁对象可以使用守护线程的方法,基于Unsafe的包操作去清除。
分布式应用,使用分布式锁来处理。
分布式锁的处理方案
数据库锁,数据库乐观锁,数据库悲观锁。
redis 锁 或者 ZooKeeper锁
使用消息队列顺序消费,保证不重复退款
消息批量发送设计 问题场景 某个活动需要对平台的客户进行短信的推销发送,假设对平台的10w用户推送某个活动。推送的用户数据由数据仓库已经推送到表t_user_promotion总共10w条数据。
而在调用短信批量发送服务的时候,经常有限制批量发送的手机号数目的,比如限制100个手机号。
如何对10w条消息进行发送
假设推送由运营人员进行触发,如何防止10w条消息出现重复触发的情况。
进阶问题:假如10w条消息,消息模板是不一致的,如何设计?
设计点:
表结构设计
查询效率问题
解决方法 方法1: 设计一张中间表作为批次表,每100个用户打一个批次。用户表中增加批次id。
整体发送的步骤如下:
用户表增加批次号字段,每100个号码打成一个批次,记录插入批次表。
调用消息队列异步处理该批次的短信消息。
消息队列消费,批次消息进行短信发送的调用,发送成功更新批次表状态。 好处为:短信发送与数据处理逻辑分离。
方法2:
用户表使用状态代表发送和未发送。
问题点: 该问题的难点在于如何处理多线程的分工及互斥问题。多线程三大核心问题:分工、协作、互斥。
分工:每个批量发送的接口只能固定发送100个手机号,因此每个线程负责100条记录的处理。
协作:该场景的线程协作,只用通知短信发送的主线线程这个数据处理完了。比如countDownLaunch,子线程完成之后countDown
互斥:每个线程负责处理100条数据,如何避免互斥问题。
解决分工及互斥几种方式:
避免互斥。
如果取10个线程,那么每个线程可以默认取主键id%10=y的记录进行处理,避免了互斥。
查询出所需要处理的记录,使用id排序,每个线程根据pageIndex+pageNo,进行数据的处理。limit pageIndex pageNo
sql可能会因为pageIndex+pageNo 导致慢查询,需要使用延迟关联的方式进行sql优化
加锁不推荐,悲观锁(使用mysql的行锁),乐观锁(使用mysql的version乐观锁)
进阶问题处理
如何解决重复发送问题,短信批量发送的请求更新为中间状态发送中。
消息模版不一致问题,使用数据分组,相同消息模版的数据统一处理。
相关类似资料 批量任务体现多线程的威力! JAVA实现多线程处理批量发送短信、APP推送
future编程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class Solution { public void test () { List<Future<String>> futureList = new ArrayList <>(); for (ChannelModel channel : channels) { Future<String> future = executorService .submit(() -> load(channel.getChannel())); futureList.add(future); } final long deadline = System.currentTimeMillis() + MAX_LOAD_SECONDS * 1000 ; for (int i = 0 ; i < futureList.size(); i++) { Future<RateModel> future = futureList.get(i); try { long timeLeft = deadline - System.currentTimeMillis(); if (timeLeft > 0 ) { String model = future.get(timeLeft, TimeUnit.MILLISECONDS); } else { if (future.isDone()) { RateModel rateModel = future.get(); } else { future.cancel(true ); } } } catch (InterruptedException e) { } } } }