【尚硅谷Java版】Flink1.13替换算子之科学分区
原创【尚硅谷Java版】Flink1.13转换运算符的物理分区。
分区:重新分配数据,并将其传递到不同的流分区,以便下一次处理。
物理分区 分类
物理分区 可以分为6类别分别为:
- 随机分区
- 轮询分区
- rescale分区
- 广播
- 全局分区
- 自定义重型分区
具体操作
1,随机分区:对于所有分区,洗牌
//1,随机分区 对于所有分区 进行洗牌
stream.shuffle().print("随机分区").setParallelism(4);
2,轮询分区:对于所有分区 进行发牌
//2,轮询分区 对于所有分区 进行发牌
stream.rebalance().print("轮询分区").setParallelism(4);
3、rescale重新缩放分区:对于所有分区先进行一个划分,然后再 在已划分的分区中,对数据进行轮询、分区和许可。以我们的例子为例,首先划分所有分区2部分,然后对于这两部分的许可,奇数在部分分区中。 偶数在某些分区中。
/*
3、rescale重新缩放分区 对于所有分区先进行一个划分,然后再 在已划分的分区中,对数据进行轮询和分区,并颁发许可证。
以我们的例子为例,首先划分所有分区2部分,然后对于这两部分的许可,奇数在部分分区中。 某些分区中的偶数
*/
env.addSource(new RichParallelSourceFunction() {
@Override
public void run(SourceContext sourceContext) throws Exception {
for (int i=0;i<8;i++){
//发送奇数和偶数0号到1数字并行分区
if(i%2==getRuntimeContext().getIndexOfThisSubtask()){
sourceContext.collect(i);
}
}
}
@Override
public void cancel() {
}
}).setParallelism(2)
.rescale()
.print()
.setParallelism(4);
4、 广播 将数据分发给以下所有子任务
//4、广播 将数据分发给以下所有子任务
stream.broadcast().print().setParallelism(2);
5,全局分区 将所有数据圈入一个分区
//5,全局分区 将所有数据圈入一个分区
stream.global().print().setParallelism(4);
6、自定义重型分区
//6、自定义重型分区
env.fromElements(1,2,3,4,5,6,7,8)
.partitionCustom(new Partitioner() {
@Override
public int partition(Integer key, int i) {
return key % 2;
}
}, new KeySelector() {
@Override
public Integer getKey(Integer integer) throws Exception {
return integer;
}
}).print().setParallelism(4)
;
测试
package com.atguigu.chapter05;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
/**
* @author potential
*/
public class TransformPartitionTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从元素读取数据
DataStreamSource stream = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Alice", "./prod?id=100", 3000L),
new Event("Bob", "./prod?id=1", 3300L),
new Event("Bob", "./home", 3500L),
new Event("Alice", "./prod?id=200", 3000L),
new Event("Bob", "./prod?id=2", 3800L),
new Event("Bob", "./prod?id=3", 4200L)
);
//1,随机分区 对于所有分区 进行洗牌
stream.shuffle().print("随机分区").setParallelism(4);
//
//2,轮询分区 对于所有分区 进行发牌
stream.rebalance().print("轮询分区").setParallelism(4);
/*
3、rescale重新缩放分区 对于所有分区先进行一个划分,然后再 在已划分的分区中,对数据进行轮询和分区,并颁发许可证。
以我们的例子为例,首先划分所有分区2部分,然后对于这两部分的许可,奇数在部分分区中。 某些分区中的偶数
*/
env.addSource(new RichParallelSourceFunction() {
@Override
public void run(SourceContext sourceContext) throws Exception {
for (int i=0;i<8;i++){
//发送奇数和偶数0号到1数字并行分区
if(i%2==getRuntimeContext().getIndexOfThisSubtask()){
sourceContext.collect(i);
}
}
}
@Override
public void cancel() {
}
}).setParallelism(2)
.rescale()
.print()
.setParallelism(4);
//4、广播 将数据分发给以下所有子任务
stream.broadcast().print().setParallelism(2);
//5,全局分区 将所有数据圈入一个分区
stream.global().print().setParallelism(4);
//6、自定义重型分区
env.fromElements(1,2,3,4,5,6,7,8)
.partitionCustom(new Partitioner() {
@Override
public int partition(Integer key, int i) {
return key % 2;
}
}, new KeySelector() {
@Override
public Integer getKey(Integer integer) throws Exception {
return integer;
}
}).print().setParallelism(4)
;
env.execute();
}
}
测试结果:
1,随机分区
2,轮询分区
3、rescale重新缩放分区
从测试结果可以看出,1,3,5,7这些奇数都在3,4在这两个分区内,0,2,4,6这些偶数都在1,2在这两个部门内部。
4、广播
设置平行度2,使用广播的这个物理分区,将每条数据分配给每个频道。
5,全局分区
从测试结果来看,所有数据都在。1在此分区中。
6、自定义重型分区
我们的代码将分区分为两部分,尽管后来的并行性是这样的。4,但是 return key % 2 已经分为2所以并行在这里不起作用。我们在测试结果中看到,0,2,4,6,8这些偶数都在1在该分区中,1,3,5,7这些奇数都在2在此分区中。
版权声明
所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除
itfan123




