简单的生产者与消费者实现
BlockingQueue 实现的生产者与消费者
在传统的生产者消费者模型中,通常是采用
public class Main {
public static void main(String[] args) {
// 初始化阻塞队列
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(1000);
// 创建生产者线程
Thread producer = new Thread(new Producer(blockingQueue, "temp.dat"));
producer.start();
// 创建消费者线程
Thread consumer = new Thread(new Consumer(blockingQueue));
consumer.start();
}
}
// 生产者
public class Producer implements Runnable {
private BlockingQueue<String> blockingQueue;
private String fileName;
private static final String FINIDHED = "EOF";
public Producer(BlockingQueue<String> blockingQueue, String fileName) {
this.blockingQueue = blockingQueue;
this.fileName = fileName;
}
@Override
public void run() {
try {
BufferedReader reader = new BufferedReader(new FileReader(new File(fileName)));
String line;
while ((line = reader.readLine()) != null) {
blockingQueue.put(line);
}
// 结束标志
blockingQueue.put(FINIDHED);
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 消费者
public class Consumer implements Runnable {
private BlockingQueue<String> blockingQueue;
private static final String FINIDHED = "EOF";
public Consumer(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
String line;
String[] arrStr;
int ret;
try {
while (!(line = blockingQueue.take()).equals(FINIDHED)) {
// 消费
arrStr = line.split("\t");
if (arrStr.length != 2) {
continue;
}
ret = Integer.parseInt(arrStr[0]) + Integer.parseInt(arrStr[1]);
System.out.println(ret);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
上述使用了
Disruptor 实现
由于我们只需要将文件中的数据行读出,然后进行计算。因此,定义
public class FileData {
private String line;
public String getLine() {
return line;
}
public void setLine(String line) {
this.line = line;
}
}
然后用于产生
public class DisruptorFactory implements EventFactory<FileData> {
public FileData newInstance() {
return new FileData();
}
}
接下来消费者的作用是读取数据并进行处理。数据的读取已经由
public class DisruptorConsumer implements WorkHandler<FileData> {
private static final String FINIDHED = "EOF";
@Override
public void onEvent(FileData event) throws Exception {
String line = event.getLine();
if (line.equals(FINIDHED)) {
return;
}
// 消费
String[] arrStr = line.split("\t");
if (arrStr.length != 2) {
return;
}
int ret = Integer.parseInt(arrStr[0]) + Integer.parseInt(arrStr[1]);
System.out.println(ret);
}
}
生产者需要一个
public class DisruptorProducer {
private static final String FINIDHED = "EOF";
private final RingBuffer<FileData> ringBuffer;
public DisruptorProducer(RingBuffer<FileData> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void pushData(String line) {
long seq = ringBuffer.next();
try {
FileData event = ringBuffer.get(seq); // 获取可用位置
event.setLine(line); // 填充可用位置
} catch (Exception e) {
e.printStackTrace();
} finally {
ringBuffer.publish(seq); // 通知消费者
}
}
public void read(String fileName) {
try {
BufferedReader reader = new BufferedReader(new FileReader(new File(fileName)));
String line;
while ((line = reader.readLine()) != null) {
// 生产数据
pushData(line);
}
// 结束标志
pushData(FINIDHED);
} catch (Exception e) {
e.printStackTrace();
}
}
}
最后需要一个
public class DisruptorMain {
public static void main(String[] args) {
DisruptorFactory factory = new DisruptorFactory(); // 工厂
ExecutorService executor = Executors.newCachedThreadPool(); // 线程池
int BUFFER_SIZE = 16; // 必须为2的幂指数
// 初始化Disruptor
Disruptor<FileData> disruptor = new Disruptor<>(factory,
BUFFER_SIZE,
executor,
ProducerType.MULTI, // Create a RingBuffer supporting multiple event publishers to the one RingBuffer
new BlockingWaitStrategy() // 默认阻塞策略
);
// 启动消费者
disruptor.handleEventsWithWorkerPool(new DisruptorConsumer(),
new DisruptorConsumer()
);
disruptor.start();
// 启动生产者
RingBuffer<FileData> ringBuffer = disruptor.getRingBuffer();
DisruptorProducer producer = new DisruptorProducer(ringBuffer);
producer.read("temp.dat");
// 关闭
disruptor.shutdown();
executor.shutdown();
}
}
Disruptor 策略
- BlockingWaitStrategy:默认等待策略。和
BlockingQueue 的实现很类似,通过使用锁和条件(Condition)进行线程同步和唤醒。此策略对于线程切换来说,最节约CPU 资源,但在高并发场景下性能有限。 - SleepingWaitStrategy:
CPU 友好型策略。会在循环中不断等待数据。首先进行自旋等待,若不成功,则使用Thread.yield() 让出CPU ,并使用LockSupport.parkNanos(1) 进行线程睡眠。所以,此策略数据处理数据可能会有较高的延迟,适合用于对延迟不敏感的场景。优点是对生产者线程影响小,典型应用场景是异步日志。 - YieldingWaitStrategy:低延时策略。消费者线程会不断循环监控
RingBuffer 的变化,在循环内部使用Thread.yield() 让出CPU 给其他线程。 - BusySpinWaitStrategy:死循环策略。消费者线程会尽最大可能监控缓冲区的变化,会占用所有
CPU 资源。