java多线程生产者消费者(java 生产者消费者是设计模式吗)
本文目录
- java 生产者消费者是设计模式吗
- java实现生产者和消费者问题的几种方式
- java 多线程 为什么不会打印一个生产者之后再打一个消费者 而是等生产者打印完才打生产者 求详解 谢谢
- 并行模式之生产者-消费者模式
- java多线程关于消费者和生产者,求源程序,求大神解答愿意提高报酬
- JAVA 中多线程的问题 生产这消费者问题
- • 利用java多线程及同步机制实现生产者-消费者模型
- java多线程,怎么算出最佳的生产消费的配比比例
- Java多线程如何才能协调好生产和消费的关系
- 由生产者/消费者问题看JAVA多线程
java 生产者消费者是设计模式吗
对于多线程程序来说,不管任何编程语言,生产者和消费者模型都是最经典的。就像学习每一门编程语言一 样,Hello World!都是最经典的例子。 实际上,准确说应该是“生产者-消费者-仓储”模型,离开了仓储,生产者消费者模型就显得没有说服力了。 对于此模型,应该明确一下几点: 1、生产者仅仅在仓储未满时候生产,仓满则停止生产。 2、消费者仅仅在仓储有产品时候才能消费,仓空则等待。 3、当消费者发现仓储没产品可消费时候会通知生产者生产。 4、生产者在生产出可消费产品时候,应该通知等待的消费者去消费。 此模型将要结合java.lang.Object的wait与notify、notifyAll方法来实现以上的需求。这是非常重要的。
java实现生产者和消费者问题的几种方式
生产者消费者问题是多线程的一个经典问题,它描述是有一块缓冲区作为仓库,生产者可以将产品放入仓库,消费者则可以从仓库中取走产品。解决生产者/消费者问题的方法可分为两类:采用某种机制保护生产者和消费者之间的同步;在生产者和消费者之间建立一个管道。第一种方式有较高的效率,并且易于实现,代码的可控制性较好,属于常用的模式。第二种管道缓冲区不易控制,被传输数据对象不易于封装等,实用性不强。在Java中有四种方法支持同步,其中前三个是同步方法,一个是管道方法。wait()/notify()方法await()/signal()方法BlockingQueue阻塞队列方法PipedInputStream/PipedOutputStream通过wait()/notify()方法实现:wait()/nofity()方法是基类Object的两个方法:wait()方法:当缓冲区已满/空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等等状态,让其他线程执行。notify()方法:当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。通过await()/signal()方法实现:await()和signal()的功能基本上和wait()/nofity()相同,完全可以取代它们,但是它们和新引入的锁定机制Lock直接挂钩,具有更大的灵活性。通过在Lock对象上调用newCondition()方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。通过BlockingQueue方法实现:它是一个已经在内部实现了同步的队列,实现方式采用的是我们第2种await()/signal()方法。它可以在生成对象时指定容量大小。它用于阻塞操作的是put()和take()方法:put()方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞。take()方法:类似于我们上面的消费者线程,容量为0时,自动阻塞。
java 多线程 为什么不会打印一个生产者之后再打一个消费者 而是等生产者打印完才打生产者 求详解 谢谢
楼主,你的代码写的不对。应该这样做,生产者每生产一个,生产完成后就通知消费者消费,这样才能做到生产一个消费一个。解决的方法:1、生产的地方加上同步等待,生产时当检查到库存有货,则等待(wait)消费者消费后再生产;生产时如果库存没货,则直接生产,生产完成则通知(notifyAll)消费者消费。2、消费的地方加上同步等待,消费时如果发现没有库存,则等待(wait)生产者生产后再消费;消费时如果有库存,则直接消费,消费后通知(notifyAll)生产者生产。 代码我就不帮你写了,你上面的大概功能都能写出来,相信这两个细节问题你肯定能修改好。具体修改的地方,就是你上面的类SyncStack中的pop、push方法修改。 有问题欢迎提问,满意请采纳!
并行模式之生产者-消费者模式
生产者-消费者模式是一种经典的多线程设计模式。它通常有两类线程,即若干个生产者线程和若干个消费者线程。生产者线程负责提交用户请求,消费者线程负责处理生产者提交的任务。两者线程通过共享内存缓冲区进行通信。 生产者线程将任务提交到共享内存缓冲区,消费者线程并不直接与生产者线程通信,而是在共享内存缓冲区中获取任务,并进行处理。共享内存缓冲区是其核心组件,它负责生产者和消费者之间的通信,避免两种直接通信,生产者和消费者都不需要知道对方的存在。由于共享内存缓冲区的存在,它允许生产者和消费者在执行速度上存在时间差,无论哪一方的速度高于对方,都可以通过共享内存缓冲区得到缓解,保证系统正常运行。 生产者-消费者模式的主要角色及作用: 生产者:用于提交用户请求,提取用户任务,并装入内存缓冲区 消费者:在内存缓冲区中提取处理任务 内存缓冲区:缓存生产者提交的任务或数据,供消费者使用 任务:生产者向内存缓冲区提交的数据结构 Main:使用生产者和消费者的客户端 BlockingQueue充当了共享内存缓存区,用于维护任务或数据队列。PCData对象表示一个生产任务或数据。生产者和消费者对象均引用同一个BlockingQueue对象实例,生产者负责创建PCData对象,并将它加入到BlockingQueue中,消费者则从BlockingQueue中获取PCData。--参考文献《实战Java高并发程序设计》
java多线程关于消费者和生产者,求源程序,求大神解答愿意提高报酬
自己看代码体会吧
import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;public class BlockingQueueTest { public static void main(String args) { ExecutorService service = Executors.newCachedThreadPool(); BlockingQueue《String》 blockingQueue = new LinkedBlockingQueue《String》(100); System.out.println("blockingQueue now contains " + blockingQueue.size() + " unit"); service.submit(new Consumer1(blockingQueue)); gap(blockingQueue); service.submit(new Productor2(blockingQueue)); gap(blockingQueue); service.submit(new Productor3(blockingQueue)); gap(blockingQueue); service.submit(new Productor4(blockingQueue)); gap(blockingQueue); service.submit(new Productor5(blockingQueue)); gap(blockingQueue); service.shutdown(); } private static void gap(BlockingQueue《String》 blockingQueue) { try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("blockingQueue now contains " + blockingQueue.size() + " unit"); }}class Consumer1 implements Runnable{ BlockingQueue《String》 blockingQueue; public Consumer1(BlockingQueue《String》 blockingQueue) { super(); this.blockingQueue = blockingQueue; } @Override public void run() { // TODO Auto-generated method stub System.out.println("Consumer1 start: need 10 units"); for(int i = 0; i 《 10; i++){ try { blockingQueue.take(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } System.out.println("Consumer1 end: has got 10 units"); } }class Productor2 implements Runnable{ BlockingQueue《String》 blockingQueue; public Productor2(BlockingQueue《String》 blockingQueue) { super(); this.blockingQueue = blockingQueue; } @Override public void run() { // TODO Auto-generated method stub System.out.println("Productor2 start: put 5 units"); for(int i = 0; i 《 5; i++){ try { blockingQueue.put("Object"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } System.out.println("Productor2 end: has put 5 units"); } }class Productor3 implements Runnable{ BlockingQueue《String》 blockingQueue; public Productor3(BlockingQueue《String》 blockingQueue) { super(); this.blockingQueue = blockingQueue; } @Override public void run() { // TODO Auto-generated method stub System.out.println("Productor3 start: put 5 units"); for(int i = 0; i 《 5; i++){ try { blockingQueue.put("Object"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } System.out.println("Productor3 end: has put 5 units"); } }class Productor4 implements Runnable{ BlockingQueue《String》 blockingQueue; public Productor4(BlockingQueue《String》 blockingQueue) { super(); this.blockingQueue = blockingQueue; } @Override public void run() { // TODO Auto-generated method stub System.out.println("Productor4 start: put 100 units"); for(int i = 0; i 《 100; i++){ try { blockingQueue.put("Object"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } System.out.println("Productor4 end: has put 100 units"); } }class Productor5 implements Runnable{ BlockingQueue《String》 blockingQueue; public Productor5(BlockingQueue《String》 blockingQueue) { super(); this.blockingQueue = blockingQueue; } @Override public void run() { // TODO Auto-generated method stub System.out.println("Productor5 start: put 10 units"); for(int i = 0; i 《 100; i++){ try { blockingQueue.put("Object"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } System.out.println("Productor5 end: has put 10 units"); } }
每个线程是隔了1s启动的, 结果
blockingQueue now contains 0 unitConsumer1 start: need 10 unitsblockingQueue now contains 0 unitProductor2 start: put 5 unitsProductor2 end: has put 5 unitsblockingQueue now contains 0 unitProductor3 start: put 5 unitsProductor3 end: has put 5 unitsConsumer1 end: has got 10 unitsblockingQueue now contains 0 unitProductor4 start: put 100 unitsProductor4 end: has put 100 unitsblockingQueue now contains 100 unitProductor5 start: put 10 unitsblockingQueue now contains 100 unit
JAVA 中多线程的问题 生产这消费者问题
不要加else 否则无法唤醒生产者线程
if(sum《=0){System.out.println("仓库中已经没有产品");try{this.wait();}catch(Exception e){}}System.out.println("消费者正在消费第"+sum+"号产品");sum--;this.notifyAll();
• 利用java多线程及同步机制实现生产者-消费者模型
制片人:时存在的产品的时候,消费者生产的产品是不够的:消费类产品关键:当产品到达上限,停止生产,通知消费者的消费关键2:当达到下限,停止消费,并通知生产者生产共享数据: 代码进口的java.util.ArrayList; BR /》公共类ProducerAndConsumer { ArrayList的产品=新的ArrayList (); MAX = 10; 布尔取消; 同步失效产生(){ int的大小= products.size(); (大小《MAX){ products.add(大小+“”); } { notifyAll()的; } } 同步的无效消耗(){(products.isEmpty()){尝试{的wait(); }(InterruptedException的E){ e.printStackTrace(); } } System.out.println( “消费products.remove:”+(0)); } 私有类生产扩展Thread { @覆盖公共无效的run(){...... /》(取消){农产品(); } } } 私人阶层的消费扩展Thread { @覆盖公共无效的run(){(取消){消耗(); } } } /》无效取消(){取消= TRUE; } 的公共ProducerAndConsumer(){新的消费者()。开始(); 新的的监制()。开始();} 公共静态无效的主要(字符串 ARGS){新ProducerAndConsumer(); } }
java多线程,怎么算出最佳的生产消费的配比比例
这和生产者和消费者线程耗时有关,
最简单的一个公式:
生产者线程数*消费者平均耗时=消费者线程数*生产者平均耗时
得出: 生产者线程数/消费者线程数=消费者平均耗时/生产者平均耗时。
得出比例后就需要确定具体的线程数了,线程数需要根据业务量大小确实,这个没办法根据公式计算。同时,线程数还受到应用类型和cpu核心数的影响,一般说来,大家认为线程池的大小经验值应该这样设置:(其中N为CPU的个数)
如果是CPU密集型应用,则线程池大小设置为N+1
如果是IO密集型应用,则线程池大小设置为2N+1
总之线程的配置是一个比较繁琐的过程,需要通过不断的试验找到最佳的配置。
Java多线程如何才能协调好生产和消费的关系
Java多线程:是程序中的一个单一的连续控制流程,一个线程可以拥有多个线程 。记得刚学习Java多线程的时候,对线程中的run()不知道是什么意思,现在大胆认为它就像Java的main()一样,可以理解为一个线程启动运行的入口函数。创建一个线程的方式有两种,一种是继承Thread类,还有就是实现Runnable 接口,两者都要重写run。时间偏分给我们想要执行的线程时,可以将该线程的优先级设置Thread.MAX_PRIORITY .下面是一个生产者和消费者的多线程的例子:其规则很简单,只有生产出来东西才能有东西来消费。Java多线程知识点:线程的创建、线程的同步、顺便回顾一下大学的操作系统。class Test { public static void main(String args) { Queue q = new Queue(); Producer p = new Producer(q); Consumer c = new Consumer(q); p.start(); c.start(); } } class Producer extends Thread { Queue q; Producer(Queue q) { this.q = q; } public void run() { for (int i = 0; i 《 10; i++) { q.put(i); System.out.println("Producer put " + i); } } } class Consumer extends Thread { Queue q; Consumer(Queue q) { this.q = q; } public void run() { while (true) { System.out.println("Consumer get " + q.get()); } } } class Queue { int value; boolean bFull = false; public synchronized void put(int i) { if (!bFull) { value = i; bFull = true; notify(); } try { wait(); } catch (Exception e) { e.printStackTrace(); } } public synchronized int get() { if (!bFull) { try { wait(); } catch (Exception e) { e.printStackTrace(); } } bFull = false; notify(); return value; } }
由生产者/消费者问题看JAVA多线程
生产者消费者问题是研究多线程程序时绕不开的问题 它的描述是有一块生产者和消费者共享的有界缓冲区 生产者往缓冲区放入产品 消费者从缓冲区取走产品 这个过程可以无休止的执行 不能因缓冲区满生产者放不进产品而终止 也不能因缓冲区空消费者无产品可取而终止
解决生产者消费者问题的方法有两种 一种是采用某种机制保持生产者和消费者之间的同步 一种是在生产者和消费者之间建立一个管道 前一种有较高的效率并且可控制性较好 比较常用 后一种由于管道缓冲区不易控制及被传输数据对象不易封装等原因 比较少用
同步问题的核心在于 CPU是按时间片轮询的方式执行程序 我们无法知道某一个线程是否被执行 是否被抢占 是否结束等 因此生产者完全可能当缓冲区已满的时候还在放入产品 消费者也完全可能当缓冲区为空时还在取出产品
现在同步问题的解决方法一般是采用信号或者加锁机制 即生产者线程当缓冲区已满时放弃自己的执行权 进入等待状态 并通知消费者线程执行 消费者线程当缓冲区已空时放弃自己的执行权 进入等待状态 并通知生产者线程执行 这样一来就保持了线程的同步 并避免了线程间互相等待而进入死锁状态
JAVA语言提供了独立于平台的线程机制 保持了 write once run anywhere 的特色 同时也提供了对同步机制的良好支持
在JAVA中 一共有四种方法支持同步 其中三个是同步方法 一个是管道方法
方法wait()/notify()
方法await()/signal()
阻塞队列方法BlockingQueue
管道方法PipedInputStream/PipedOutputStream
下面我们看各个方法的实现
方法wait()/notify()
wait()和notify()是根类Object的两个方法 也就意味着所有的JAVA类都会具有这个两个方法 为什么会被这样设计呢?我们可以认为所有的对象默认都具有一个锁 虽然我们看不到 也没有办法直接操作 但它是存在的
wait()方法表示 当缓冲区已满或空时 生产者或消费者线程停止自己的执行 放弃锁 使自己处于等待状态 让另一个线程开始执行
notify()方法表示 当生产者或消费者对缓冲区放入或取出一个产品时 向另一个线程发出可执行通知 同时放弃锁 使自己处于等待状态
下面是一个例子代码
import java util LinkedList;
public class Sycn {
private LinkedList《Object》 myList =new LinkedList《Object》();
private int MAX = ;
public Sycn (){
}
public void start(){
new Producer() start();
new Consumer() start();
}
public static void main(String args) throws Exception{
Sycn s = new Sycn ();
s start();
}
class Producer extends Thread{
public void run(){
while(true){
synchronized(myList){
try{
while(myList size() == MAX){
System out println( warning: it s full! );
myList wait();
}
Object o = new Object();
if(myList add(o)){
System out println( Producer: + o);
myList notify();
}
}catch(InterruptedException ie){
System out println( producer is interrupted! );
}
}
}
}
}
class Consumer extends Thread{
public void run(){
while(true){
synchronized(myList){
try{
while(myList size() == ){
System out println( warning: it s empty! );
myList wait();
}
Object o = myList removeLast();
System out println( Consumer: + o);
myList notify();
}catch(InterruptedException ie){
System out println( consumer is interrupted! );
}
}
}
}
}
}
方法await()/signal()
在JDK 以后 JAVA提供了新的更加健壮的线程处理机制 包括了同步 锁定 线程池等等 它们可以实现更小粒度上的控制 await()和signal()就是其中用来做同步的两种方法 它们的功能基本上和wait()/notify()相同 完全可以取代它们 但是它们和新引入的锁定机制Lock直接挂钩 具有更大的灵活性
下面是一个例子代码
import java util LinkedList;
import ncurrent locks *;
public class Sycn {
private LinkedList《Object》 myList = new LinkedList《Object》();
private int MAX = ;
private final Lock lock = new ReentrantLock();
private final Condition full = lock newCondition();
private final Condition empty = lock newCondition();
public Sycn (){
}
public void start(){
new Producer() start();
new Consumer() start();
}
public static void main(String args) throws Exception{
Sycn s = new Sycn ();
s start();
}
class Producer extends Thread{
public void run(){
while(true){
lock lock();
try{
while(myList size() == MAX){
System out println( warning: it s full! );
full await();
}
Object o = new Object();
if(myList add(o)){
System out println( Producer: + o);
empty signal();
}
}catch(InterruptedException ie){
System out println( producer is interrupted! );
}finally{
lock unlock();
}
}
}
}
class Consumer extends Thread{
public void run(){
while(true){
lock lock();
try{
while(myList size() == ){
System out println( warning: it s empty! );
empty await();
}
Object o = myList removeLast();
System out println( Consumer: + o);
full signal();
}catch(InterruptedException ie){
System out println( consumer is interrupted! );
}finally{
lock unlock();
}
}
}
}
}
阻塞队列方法BlockingQueue
BlockingQueue也是JDK 的一部分 它是一个已经在内部实现了同步的队列 实现方式采用的是我们的第 种await()/signal()方法 它可以在生成对象时指定容量大小
它用于阻塞操作的是put()和take()方法
put()方法类似于我们上面的生产者线程 容量最大时 自动阻塞
take()方法类似于我们上面的消费者线程 容量为 时 自动阻塞
下面是一个例子代码
import ncurrent *;
public class Sycn {
private LinkedBlockingQueue《Object》 queue = new LinkedBlockingQueue《Object》( );
private int MAX = ;
public Sycn (){
}
public void start(){
new Producer() start();
new Consumer() start();
}
public static void main(String args) throws Exception{
Sycn s = new Sycn ();
s start();
}
class Producer extends Thread{
public void run(){
while(true){
//synchronized(this){
try{
if(queue size() == MAX)
System out println( warning: it s full! );
Object o = new Object();
queue put(o);
System out println( Producer: + o);
}catch(InterruptedException e){
System out println( producer is interrupted! );
}
//}
}
}
}
class Consumer extends Thread{
public void run(){
while(true){
//synchronized(this){
try{
if(queue size() == )
System out println( warning: it s empty! );
Object o = queue take();
System out println( Consumer: + o);
}catch(InterruptedException e){
System out println( producer is interrupted! );
}
//}
}
}
}
}
你发现这个例子中的问题了吗?
如果没有 我建议你运行一下这段代码 仔细观察它的输出 是不是有下面这个样子的?为什么会这样呢?
…
warning: it s full!
Producer: java lang object@ e a
…
你可能会说这是因为put()和System out println()之间没有同步造成的 我也这样认为 我也这样认为 但是你把run()中的synchronized前面的注释去掉 重新编译运行 有改观吗?没有 为什么?
这是因为 当缓冲区已满 生产者在put()操作时 put()内部调用了await()方法 放弃了线程的执行 然后消费者线程执行 调用take()方法 take()内部调用了signal()方法 通知生产者线程可以执行 致使在消费者的println()还没运行的情况下生产者的println()先被执行 所以有了上面的输出 run()中的synchronized其实并没有起什么作用
对于BlockingQueue大家可以放心使用 这可不是它的问题 只是在它和别的对象之间的同步有问题
对于这种多重嵌套同步的问题 以后再谈吧 欢迎大家讨论啊!
管道方法PipedInputStream/PipedOutputStream
这个类位于java io包中 是解决同步问题的最简单的办法 一个线程将数据写入管道 另一个线程从管道读取数据 这样便构成了一种生产者/消费者的缓冲区编程模式
下面是一个例子代码 在这个代码我没有使用Object对象 而是简单的读写字节值 这是因为PipedInputStream/PipedOutputStream不允许传输对象 这是JAVA本身的一个bug 具体的大家可以看sun的解释 _bug do?bug_id=
import java io *;
public class Sycn {
private PipedOutputStream pos;
private PipedInputStream pis;
//private ObjectOutputStream oos;
//private ObjectInputStream ois;
public Sycn (){
try{
pos = new PipedOutputStream();
pis = new PipedInputStream(pos);
//oos = new ObjectOutputStream(pos);
//ois = new ObjectInputStream(pis);
}catch(IOException e){
System out println(e);
}
}
public void start(){
new Producer() start();
new Consumer() start();
}
public static void main(String args) throws Exception{
Sycn s = new Sycn ();
s start();
}
class Producer extends Thread{
public void run() {
try{
while(true){
int b = (int) (Math random() * );
System out println( Producer: a byte the value is + b);
pos write(b);
pos flush();
//Object o = new MyObject();
//oos writeObject(o);
//oos flush();
//System out println( Producer: + o);
}
}catch(Exception e){
//System out println(e);
e printStackTrace();
}finally{
try{
pos close();
pis close();
//oos close();
//ois close();
}catch(IOException e){
System out println(e);
}
}
}
}
class Consumer extends Thread{
public void run(){
try{
while(true){
int b = pis read();
System out println( Consumer: a byte the value is + String valueOf(b));
//Object o = ois readObject();
//if(o != null)
//System out println( Consumer: + o);
}
}catch(Exception e){
//System out println(e);
e printStackTrace();
}finally{
try{
pos close();
pis close();
//oos close();
//ois close();
}catch(IOException e){
System out println(e);
}
}
}
}
//class MyObject implements Serializable {
//}
lishixinzhi/Article/program/Java/gj/201311/27617
更多文章:
心灵感应游戏(心灵感应 游戏 一人指物 另一人猜 总是可以猜到围观者指定的物品 没有停顿 没有动作眼神等暗示 没)
2024年6月22日 06:05
有什么好玩的PSP游戏适合女生?PSP里有什么经典的游戏,大家推荐下
2024年5月10日 12:52
ubuntu 中文输入法(怎样才能在 UBUNTU中输入中文)
2024年7月1日 18:03
蘑菇云root下载(手机HTL V11安卓4.0.4怎么获得root权限)
2024年5月1日 18:16