博客
关于我
【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(2)- 自定义、mysql
阅读量:827 次
发布时间:2019-03-26

本文共 5551 字,大约阅读时间需要 18 分钟。

Flink自定义数据源实践指南:两种实现方案解析

在这篇文章中,我将为大家分享 Flink 的自定义数据源配置方案,其中包括两种常见的实现方式:自定义数据源的实现和基于 MySQL 数据源的配置。通过这两种方法,你可以灵活管理和处理数据流源,从而实现更多场景下的数据处理需求。

一、Flink 自定义数据源介绍

作为一名 Flink 开发者,你很可能会遇到数据源的需求场景。在 Flink 中,数据源接口提供了灵活的数据获取方式,而实现这个接口的组件可以成为自定义的数据源。根据不同应用场景,自定义数据源的实现方式可能有所不同。

以下是两种自定义数据源实现方法的主要内容:

二、自定义数据源实现与应用

  • 自定义数据源的实现方法
  • 在 Flink 中,自定义数据源的实现可以按照以下步骤进行:

    • 确定数据源类型:根据项目需求选择合适的 SourceFunction 或 RichParallelSourceFunction 类型,灵活处理并行度配置。

    • 定义数据生成逻辑:通过编写具体的 source 方法实现数据生成模块。这可以是一个简单的随机数生成器,或者更复杂的业务数据初始化模块。

    • 实现生命周期管理:确保自定义数据源能够在运行和取消运行时正确处理资源释放。例如,在 RichParallelSourceFunction 中实现 run 方法主循环,增加退出标志位。

    1. MySQL 数据源配置方法
    2. 如果你需要将现有的 MySQL 数据库数据以流化形式处理,可以通过自定义一个 MySQL 数据源组件来实现:

      • 加载必要的依赖:在 Maven 项目中添加 MySQL 连接器依赖,确保数据库连接可行。

      • 创建 JavaBean 数据类:定义一个与数据库表结构一致的数据对象类,用于存储数据字段信息。

      • 实现自定义数据源:继承 RichParallelSourceFunction,获取数据库连接并执行查询。确保在运行循环中实现高效的数据读取方式,同时支持数据库查询的条件扩展。

      • 配置并行度:根据数据读取的并行需求,设置合适的并行度参数。

      三、示例配置与验证

    3. 自定义数据源配置步骤
      • 安装项目依赖:确保 project 文件中包含正确的 Flink 和 MySQL 连接依赖。

      • 实现自定义源类:编写自定义源类,定义数据字段及其获取逻辑。例如,以下是一个简单的 User 类定义示例:

      package org.datastreamapi.source.custom.bean;
      import lombok.Data;
      import lombok.NoArgsConstructor;
      @Data
      @NoArgsConstructor
      public class User {
      private int id;
      private String name;
      private String email;
      private long clickCount;
      private long registeredAt;
      }
      • 编写自定义源实现:
      package org.datastreamapi.source.custom;
      import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
      import org.datastreamapi.source.custom.bean.User;
      import java.sql.PreparedStatement;
      import java.sql.SQLException;
      import java.util.Random;
      public class CustomUserSource extends RichParallelSourceFunction
      {
      private boolean running = true;
      private Random random = new Random();
      @Override
      public void run(SourceContext
      ctx) throws Exception {
      while (running) {
      ctx.collect(new User(
      Math.abs(random.nextInt(1000000)), // 生成用户唯一ID
      "_renter" +iales%100,
      "alanchan" + String.randomUUID().toString(),
      random.nextDouble() * 1000,
      System.currentTimeMillis()
      ));
      Thread.sleep(100); // 每隔100ms生成一条数据
      }
      }
      @Override
      public void cancel() {
      running = false;
      }
      }
      1. 数据源配置与验证
        • 在 Main 类中初始化环境,并添加数据源:
        public class TestCustomSourceDemo {
        public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        DataStream
        userDS = env.addSource(new CustomUserSource())
        .setParallelism(2); // 设置并行度可根据场景调整
        userDS.print().execute(env);
        }
        }
        • 启动程序并查看输出结果:确保数据源能够按预期生成数据,并输出到控制台。

        四、MySQL 数据源配置

        1.MySQL 数据源接口实现

        在这种情况下,我们需要实现一个能够从 MySQL 数据库读取数据的自定义数据源。详细步骤如下:

        • 加载 MySQL 连接依赖:
        com.stackoverflow
        mysql-connector-java
        5.1.38
        • 定义数据持有者类:创建一个与数据库表对应的 JavaBean 类。

        • 实现自定义数据源:

        package org.datastreamapi.source.custom.mysql;
        import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
        import org.datastreamapi.source.custom.mysql.bean.User;
        import java.sql.Connection;
        import java.sql.DriverManager;
        import java.sql.PreparedStatement;
        import java.sql.ResultSet;
        public class CustomMySQLSource extends RichParallelSourceFunction
        {
        private Connection conn;
        private PreparedStatement ps;
        private ResultSet rs;
        @Override
        public void open(Configuration parameters) throws Exception {
        String jdbcUrl = "jdbc:mysql://localhost:3306/test";
        String username = "root";
        String password = "password";
        conn = DriverManager.getConnection(jdbcUrl, username, password);
        String sql = "SELECT id, name, email, click_count, registered_at FROM user";
        ps = conn.prepareStatement(sql);
        }
        @Override
        public void run(SourceContext
        ctx) throws Exception {
        while (running) {
        rs = ps.executeQuery();
        while (rs.next()) {
        User user = new User(
        rs.getInt("id"),
        rs.getString("name"),
        rs.getString("email"),
        rs.getInt("click_count"),
        rs.getLong("registered_at")
        );
        ctx.collect(user);
        }
        Thread.sleep(5000); // 每隔5秒查询一次数据
        }
        }
        @Override
        public void cancel() {
        running = false;
        }
        }
        1. 数据源使用与测试
          • 在主类中配置并运行:
          public class TestCustomMySQLSourceDemo {
          public static void main(String[] args) {
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
          DataStream
          userDS = env.addSource(new CustomMySQLSource())
          .setParallelism(1); // 设置并行度
          userDS.print().execute(env);
          }
          }
          • 在数据库中创建用户表,并插入测试数据:
          CREATE TABLE user (
          id INT PRIMARY KEY AUTO_INCREMENT,
          name VARCHAR(255) DEFAULT NULL,
          email VARCHAR(255) DEFAULT NULL,
          click_count INT DEFAULT 0,
          registered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
          );
          INSERT INTO user (name, email, click_count)
          VALUES ('John Doe', 'john@example.com', 100),
          ('Jane Smith', 'jane@example.com', 200);
          • 执行程序后,控制台将显示从数据库读取到的用户数据。

          五、总结

          在本文中,我们探讨了两种 Flink 自定义数据源的实现方法:基于自定义数据生成和基于 MySQL 数据库的数据源配置。通过这些方法,你可以根据不同的需求选择适合的数据源接口,并灵活配置。

          如果你需要进一步的信息或需求扩展,可以查阅更多的开发文档和技术案例。期待您的持续关注与反馈。

    转载地址:http://gnmyk.baihongyu.com/

    你可能感兴趣的文章
    MySQL 字符串截取函数,字段截取,字符串截取
    查看>>
    MySQL 存储引擎
    查看>>
    mysql 存储过程 注入_mysql 视图 事务 存储过程 SQL注入
    查看>>
    MySQL 存储过程参数:in、out、inout
    查看>>
    mysql 存储过程每隔一段时间执行一次
    查看>>
    mysql 存在update不存在insert
    查看>>
    Mysql 学习总结(86)—— Mysql 的 JSON 数据类型正确使用姿势
    查看>>
    Mysql 学习总结(87)—— Mysql 执行计划(Explain)再总结
    查看>>
    Mysql 学习总结(88)—— Mysql 官方为什么不推荐用雪花 id 和 uuid 做 MySQL 主键
    查看>>
    Mysql 学习总结(89)—— Mysql 库表容量统计
    查看>>
    mysql 实现主从复制/主从同步
    查看>>
    mysql 审核_审核MySQL数据库上的登录
    查看>>
    mysql 导入 sql 文件时 ERROR 1046 (3D000) no database selected 错误的解决
    查看>>
    mysql 导入导出大文件
    查看>>
    MySQL 导出数据
    查看>>
    mysql 将null转代为0
    查看>>
    mysql 常用
    查看>>
    MySQL 常用列类型
    查看>>
    mysql 常用命令
    查看>>
    Mysql 常见ALTER TABLE操作
    查看>>