242 字
1 分钟
Dart Stream 相关
基本示例
import 'dart:async';
void main() async {
final controller = StreamController<int>();
Stream<int> stream = controller.stream;
StreamSink<int> sink = controller.sink;
StreamSubscription<int> subscription = stream.listen(
(data) => print('接收到数据: $data'),
onError: (error) => print('错误: $error'),
onDone: () => print('流已关闭'),
);
sink.add(1);
sink.add(2);
sink.add(3);
await Future.delayed(Duration(seconds: 1));
sink.addError('发生错误');
await controller.close();
await subscription.cancel();
}
高级用法
import 'dart:async';
Stream<int> countStream(int to) async* {
for (int i = 1; i <= to; i++) {
yield i;
}
}
void main() async {
final controller = StreamController<int>.broadcast();
Stream<int> broadcastStream = controller.stream;
StreamSubscription<int> subscription1 = broadcastStream.listen(
(data) => print('订阅1: $data'),
onDone: () => print('订阅1完成'),
);
StreamSubscription<int> subscription2 = broadcastStream.listen(
(data) => print('订阅2: $data'),
onDone: () => print('订阅2完成'),
);
controller.add(1);
controller.add(2);
Stream<int> countToFive = countStream(5);
await for (final count in countToFive) {
controller.add(count);
}
subscription1.pause();
controller.add(6);
controller.add(7);
subscription1.resume();
await controller.close();
await subscription1.cancel();
await subscription2.cancel();
}
- StreamController 用于创建和控制 Stream。
- Stream 可以被多次订阅(广播模式)。
- StreamSink (controller.sink) 用于向 Stream 添加数据。
- StreamSubscription 用于管理订阅,如暂停、恢复和取消。
- 可以使用 async* 和 yield 创建自定义 Stream。
- 可以使用 await for 循环来处理 Stream 中的数据。