【尚硅谷Java版】Flink1.13替换算子之科学分区

原创
小哥 3年前 (2022-11-07) 阅读数 5 #大杂烩

【尚硅谷Java版】Flink1.13转换运算符的物理分区。

分区:重新分配数据,并将其传递到不同的流分区,以便下一次处理。

物理分区 分类

物理分区 可以分为6类别分别为:

  1. 随机分区
  2. 轮询分区
  3. rescale分区
  4. 广播
  5. 全局分区
  6. 自定义重型分区

具体操作

1,随机分区:对于所有分区,洗牌

 //1,随机分区 对于所有分区 进行洗牌
    stream.shuffle().print("随机分区").setParallelism(4);

2,轮询分区:对于所有分区 进行发牌

 //2,轮询分区  对于所有分区 进行发牌
        stream.rebalance().print("轮询分区").setParallelism(4);

3、rescale重新缩放分区:对于所有分区先进行一个划分,然后再 在已划分的分区中,对数据进行轮询、分区和许可。以我们的例子为例,首先划分所有分区2部分,然后对于这两部分的许可,奇数在部分分区中。 偶数在某些分区中。

/*
        3、rescale重新缩放分区  对于所有分区先进行一个划分,然后再 在已划分的分区中,对数据进行轮询和分区,并颁发许可证。
           以我们的例子为例,首先划分所有分区2部分,然后对于这两部分的许可,奇数在部分分区中。 某些分区中的偶数
        */
        env.addSource(new RichParallelSourceFunction() {

            @Override
            public void run(SourceContext sourceContext) throws Exception {

                for (int i=0;i<8;i++){
                    //发送奇数和偶数0号到1数字并行分区
                    if(i%2==getRuntimeContext().getIndexOfThisSubtask()){
                        sourceContext.collect(i);
                    }
                }
            }
            @Override
            public void cancel() {
            }
        }).setParallelism(2)
                .rescale()
                .print()
                .setParallelism(4);

4、 广播 将数据分发给以下所有子任务

//4、广播 将数据分发给以下所有子任务
        stream.broadcast().print().setParallelism(2);

5,全局分区 将所有数据圈入一个分区

  //5,全局分区   将所有数据圈入一个分区
    stream.global().print().setParallelism(4);

6、自定义重型分区

//6、自定义重型分区
env.fromElements(1,2,3,4,5,6,7,8)
        .partitionCustom(new Partitioner() {
            @Override
            public int partition(Integer key, int i) {
                return key % 2;
            }

        }, new KeySelector() {

            @Override
            public Integer getKey(Integer integer) throws Exception {
                return integer;
            }
        }).print().setParallelism(4)
;

测试

package com.atguigu.chapter05;

import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

/**
 * @author potential
 */
public class TransformPartitionTest {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //从元素读取数据
        DataStreamSource stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=100", 3000L),
                new Event("Bob", "./prod?id=1", 3300L),
                new Event("Bob", "./home", 3500L),
                new Event("Alice", "./prod?id=200", 3000L),
                new Event("Bob", "./prod?id=2", 3800L),
                new Event("Bob", "./prod?id=3", 4200L)

        );

        //1,随机分区 对于所有分区 进行洗牌
        stream.shuffle().print("随机分区").setParallelism(4);

//
        //2,轮询分区  对于所有分区 进行发牌
        stream.rebalance().print("轮询分区").setParallelism(4);

        /*
        3、rescale重新缩放分区  对于所有分区先进行一个划分,然后再 在已划分的分区中,对数据进行轮询和分区,并颁发许可证。
           以我们的例子为例,首先划分所有分区2部分,然后对于这两部分的许可,奇数在部分分区中。 某些分区中的偶数
        */
        env.addSource(new RichParallelSourceFunction() {

            @Override
            public void run(SourceContext sourceContext) throws Exception {

                for (int i=0;i<8;i++){
                    //发送奇数和偶数0号到1数字并行分区
                    if(i%2==getRuntimeContext().getIndexOfThisSubtask()){
                        sourceContext.collect(i);
                    }
                }
            }
            @Override
            public void cancel() {
            }
        }).setParallelism(2)
                .rescale()
                .print()
                .setParallelism(4);

        //4、广播 将数据分发给以下所有子任务
        stream.broadcast().print().setParallelism(2);

        //5,全局分区   将所有数据圈入一个分区
        stream.global().print().setParallelism(4);

        //6、自定义重型分区
        env.fromElements(1,2,3,4,5,6,7,8)
                .partitionCustom(new Partitioner() {
                    @Override
                    public int partition(Integer key, int i) {
                        return key % 2;
                    }

                }, new KeySelector() {

                    @Override
                    public Integer getKey(Integer integer) throws Exception {
                        return integer;
                    }
                }).print().setParallelism(4)
        ;

        env.execute();
    }
}

测试结果:
1,随机分区

2,轮询分区

3、rescale重新缩放分区

从测试结果可以看出,1,3,5,7这些奇数都在3,4在这两个分区内,0,2,4,6这些偶数都在1,2在这两个部门内部。

4、广播

设置平行度2,使用广播的这个物理分区,将每条数据分配给每个频道。

5,全局分区


从测试结果来看,所有数据都在。1在此分区中。

6、自定义重型分区

我们的代码将分区分为两部分,尽管后来的并行性是这样的。4,但是 return key % 2 已经分为2所以并行在这里不起作用。我们在测试结果中看到,0,2,4,6,8这些偶数都在1在该分区中,1,3,5,7这些奇数都在2在此分区中。

版权声明

所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除