什么是Stream

Dart中的异步编程有2个部分组成,分别是:FutureStream

Future前面已经讲过,它代表异步的不会立即完成的操作,并且能够等待结果的完成。

Stream是非阻塞的异步的数据序列,它不是采用等待的方式,而是采用监听的方式来得知数据的状态,当数据准备好时会通过回调通知我们。

把去银行存钱比作异步操作,Future的做法是站在前一个人的身后等着,同时不耽误自己做事(比如玩手机),等他办完你立刻上去;Stream的做法是取号,然后去大厅等着,也不耽误自己做事,等轮到你的号时会有大喇叭通知你。两者的作法本质上区别不大,仅仅是形式不一样,都是异步非阻塞。

创建Stream

Dart提供了多种API来创建Stream,常用的有如下几种:

  • Stream.fromIterable() ,将一个可迭代类型转为Stream
  • Stream.fromFuture(),将一个Future转为Stream
  • StreamController,允许我们自定义Stream准备数据的逻辑

Stream.fromIterable()

使用Stream.fromIterable方法来创建Stream,所有的集合都是可迭代类型,我们可以将一个集合转为Stream,代码如下:

var stream = Stream.fromIterable([1, 2, 3]);

数据序列创建好了,接下来使用listen()方法来监听数据,代码如下:

var stream = Stream.fromIterable([1, 2, 3]);
//由于数据是直接创建好的,所以序列准备数据的速度非常快,几乎是瞬间
stream.listen((data) {
    print("data: $data"); //异步执行
});
print("hello bike!");
//输出
hello stream!
data: 1
data: 2
data: 3

Stream.fromFuture()

使用Stream.fromFuture()方法来创建Stream,上面的例子中,数据序列是瞬间准备好数据。这次我们制造一个延时准备数据的Stream,代码如下:

//延时2秒返回
Future<int> makeFuture() async{
  await Future.delayed(Duration(seconds: 2));
  return 100;
}
//通过Future创建一个延时2秒准备好数据的序列
var stream = Stream.fromFuture(makeFuture());
stream.listen(print);
print("hello stream!");
//输出
hello stream!
100

还提供了一个方法Stream.fromFutures(futures)可以从多个Future中创建Stream,每当一个Future返回时,Stream就会准备(生产)一次数据。

StreamController

使用StreamController来创建Stream,上面的两种方法都是靠别人来产生数据,我们无法控制产生数据的逻辑,StreamController可以让我们自己来定义数据该如何产生。

现在要实现一个能定时生产int数据的Stream,代码如下:

Stream<int> intStream(Duration interval) {
  var controller = StreamController<int>();
  var start = 0; //起始值
  //执行定时任务
  Timer.periodic(interval, (timer) {
    start++;
    controller.add(start); //每隔一个interval周期,就准备一次数据
  });
  return controller.stream;
}

但上面这个Stream会一直生产数据,直到天荒地老,设备断电,因为我们没有设置终止条件。

来监听下这个无限Stream,并将数据打印出来:

intStream(Duration(seconds: 1)).listen(print);
print("hello stream!");
//输出
hello stream!
1
2
3
4
...

如何让序列在一定条件终止呢?

我们给intStream()方法增加一个可选参数,用来限制序列的最大值,当达到最大值时就停止生产。代码如下:

Stream<int> intStream(Duration interval, [int max]) {
  var controller = StreamController<int>();
  var start = 0; //起始值
  //执行定时任务
  Timer.periodic(interval, (timer) {
    start++;
    controller.add(start); //每隔一个interval周期,就准备一次数据
    if (max != null && start >= max) {
      timer.cancel(); //停止定时器
      controller.close(); //终止序列
    }
  });
  return controller.stream;
}
intStream(Duration(seconds: 1), 3).listen(print);
print("hello stream!");
//输出
hello stream!
1
2
3

当序列的值达到传入的max时,就取消定时器,并关闭序列,不再生产数据。

接收数据

要接收Stream的数据,除了使用listen()方法进行监听外,还可以使用await for来获取数据。代码如下:

await for (var v in intStream(Duration(seconds: 1), 5)){
    print(v);
}

注意:await for必须在async方法中才能使用。

等待订阅

继续上面的例子,当调用一个Stream对象的listen()方法时,就表示对这个对象进行订阅或监听。但如果不调用listen()方法,Stream还会不会生产数据呢?

答案是会。

在上面的例子中,即使一直没有订阅,Stream对象从创建开始就会进行生产数据,如果没有最大值限制,它会一直生产数据不会停止。生成的数据会被Stream缓存起来,等有人订阅时,会将缓存的数据瞬间全部丢给订阅者。

编写一个延时订阅Stream的方法:

void listenWithDelay() async {
  var stream = intStream(Duration(seconds: 1));
  //等待5秒之后再订阅数据
  await Future.delayed(Duration(seconds: 5));
  stream.listen(print);//监听并打印
}
main() async {
  await listenWithDelay();
  print("hello stream!");
}

执行代码会输出如下内容:

hello stream! #先打印这行
#等待5秒,瞬间输出1,2,3,4,5
1
2
3
4
5
... #继续输出

如果生产的数据一直没有人使用,很容易造成内存泄露。很多情况下,我们希望等有人订阅了再开始生产数据,在构造Stream的时候,可以传入onListen参数,该参数是一个方法,一旦有人订阅就会执行。

修改intStream()方法的代码如下:

Stream<int> intStream(Duration interval, [int max]) {
  Timer timer;
  StreamController controller;
  var start = 0; //起始值

  //当有人订阅就会执行
  void startTimer(){
    timer = Timer.periodic(interval, (timer) {
      start++;
      controller.add(start); //每隔一个interval周期,就准备一次数据
      if (max != null && start >= max) {
        timer.cancel(); //停止定时器
        controller.close(); //终止序列
      }
    });
  }
  controller = StreamController<int>(onListen: startTimer);
  return controller.stream;
}
await listenWithDelay();
print("hello stream!");
//等待5秒之后输出
hello stream!
1
2
3
...

暂停与恢复

有时候我们从Stream中取了一段数据之后,希望先干点别的事,待会儿再来取。此时需要先暂停Stream,在暂停期间,Stream也应该不再生产数据了;等我们回来,Stream再继续生产数据。

Stream.listen()方法返回的是一个StreamSubscription对象,该对象提供了pause()resume()方法来暂停和恢复Stream。而Stream在构造时也可以传入onPauseonResume等参数,在这2个参数中我们去做一些暂停和恢复的逻辑。

修改intStream()方法代码如下:

Stream<int> intStream(Duration interval, [int max]) {
  Timer timer;
  StreamController controller;
  var start = 0; //起始值

  //当有人订阅就会执行
  void startTimer() {
    timer = Timer.periodic(interval, (timer) {
      start++;
      controller.add(start); //每隔一个interval周期,就准备一次数据
      if (max != null && start >= max) {
        timer.cancel(); //停止定时器
        controller.close(); //终止序列
      }
    });
  }

  void stopTimer() {
    if (timer != null) {
      timer.cancel();
      timer = null;
    }
  }

  controller = StreamController<int>(
      onListen: startTimer,
      onPause: stopTimer,
      onResume: startTimer,
      onCancel: stopTimer);
  return controller.stream;
}

然后编写一个方法,在某个条件下暂停Stream,过一会儿再恢复Stream。代码如下:

void listenWithPause() async {
  var stream = intStream(Duration(seconds: 1));
  StreamSubscription listener;
  listener = stream.listen((el) {
    print(el);
    if (el == 3) {
      //暂停Stream,3秒钟之后自动恢复
      listener.pause(Future.delayed(Duration(seconds: 3)));
    }
  });
}
main() async {
  await listenWithPause();
  print("hello stream!");
}

运行代码,输入如下:

# 依次输出
hello stream!
1
2
3
# 暂停3秒后继续输出
4
5
6
...

配合StreamController提供的状态回调方法,我们可以实现出各种复杂逻辑的数据序列。

Stream高阶操作

Stream作为数据序列,本质上也是一些列数据的集合,只不过是异步计算的。所以也有像集合那样的高阶函数用来处理数据,来看看几个常用的。

  • map操作

    intStream(Duration(seconds: 1)).map((el)=> el * 10).listen(print)
    
  • reduce操作

    print(await intStream(Duration(seconds: 1), 5).reduce((e1,e2)=> e1 + e2));
    

    reduce方法返回的是Future对象,需要await结果。需要注意的是,我们给intStream()方法传入了最大值。如果不限制最大值,就是一个无限生产数据的序列,那将会有无限的reduce计算,会造成await永远也等不到结果。

  • join操作

    print(await intStream(Duration(seconds: 1), 5).join("-"));
    

    join操作返回的也是Future对象,注意需要传入最大值哦。

  • 筛选操作

    intStream(Duration(seconds: 1)).where((el) => el % 2 == 0).listen(print);
    

    上面的代码会筛选出序列中所有的偶数并打印出来。

  • 部分提取

    //只取前2个数据
    intStream(Duration(seconds: 1)).take(2).listen(print);
    //跳过前2个数据
    intStream(Duration(seconds: 1)).skip(2).listen(print);
    

Stream的高阶操作函数有很多,我不可能把所有的都讲完,剩下的需要你举一反三了。

更新时间: 5/6/2019, 11:44:18 AM