bigdata
大约 13 分钟
axios跨域
lintOnSave:false,
devServer:{
proxy:{
'/api':{
target:'http://目标主机ip+端口',
changeOrigin:true
}
}
}
可视化图形
<template>
<div id="main" ref="char" style="width: 600px; height: 400px;"></div>
</template>
<script>
import echarts from '@/assets/echarts.js';
import axios from '@/assets/axios.js';
export default {
mounted(){
axios.get("http://192.168.1.134:3000/api").then(response =>{
const categories = response.data.categories;
const name = response.data.series[0].name;
const type = response.data.series[0].type;
const data = response.data.series[0].data;
console.log(response.data);
var myChart = echarts.init(this.$refs.char);
const option = {
title:{
text: 'ECharts 入门示例'
},
tooltip: {},
xAxis:{
data: categories
},
yAxis:{},
series: [{
name,
type, // line,pie,bar
data // pie图 data:[{value:200,name:"苹果"},{value:200,name:"桃子"}]
}]
}
myChart.setOption(option);
})
}
}
</script>
<style>
</style>
kafka 基本操作
# 启动
bin/kafka-server-start.sh -daemon config/server.properties
# 创建主题
./kafka-topic.sh --create --topic test --partitions 3 --replication-factor 1 --zookeeper master:2181,slave1:2181,slave2:2181
# 查看主题
./kafka-topic.sh --list --zookeeper master:2181,slave1:2181,slave2:2181
# 删除主题
./kafka-topic.sh --delete --bootstrap-server master:9092,slave1:9092,slave2:9092 --topic test
# 创建消费者
./kafka-console-consumer.sh --bootstrap-server master:9092,slave1:9092,slave2:9092 --topic test --from-beginning
# 创建生产者
./kafka-console-produce.sh --broker-list master:9092,slave1:9092,slave2:9092 --topic test
Bash
hbase基本操作
# 启动
start-dfs.sh
zkServer.sh start
start-hbase.sh
# 进入命令行界面
hbase shell
web界面 ip加16010
SQL
flink消费kafka存入dwd层
package first;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
public class WriteKafka {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("master:9092,slave1:9092,slave2:9092")
.setGroupId("test")
.setTopics("test")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStreamSource<String> kafkaStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
kafkaStream.print();
// 写入kafka
// 创建KafkaSink
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("master:9092,slave1:9092,slave2:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("test1") // Kafka主题
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
// 将数据流发送到Kafka
kafkaStream.sinkTo(sink);
// 执行流
env.execute("Kafka Sink Example");
}
}
Java
flink消费kafka并存到hbase并建立hive外表
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import second.Hbasetest;
public class Kafka {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("master:9092,slave1:9092,slave2:9092")
.setGroupId("test")
.setTopics("test")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStreamSource<String> kafkaStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
kafkaStream.print();
kafkaStream.process(new Hbasetest());
//stream.map();
env.execute("Kafka");
}
}
Java
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
public class HbaseProcess extends ProcessFunction<String, String> {
private static final long serialVersionUID = 1L;
private Connection connection = null;
private Table table = null;
@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
// 创建HBase配置
Configuration configuration = HBaseConfiguration.create();
// 加载HBase配置文件
configuration.addResource(new Path("/home/ye/hbase-site.xml"));
configuration.addResource(new Path("/home/ye/core-site.xml"));
// 建立与HBase的连接
connection = ConnectionFactory.createConnection(configuration);
// 指定要操作的HBase表名
TableName tableName = TableName.valueOf("hbase_order_detail");
// 获取HBase表对象
table = connection.getTable(tableName);
}
@Override
public void close() throws Exception {
// 关闭表和连接资源
if (null != table) table.close();
if (null != connection) connection.close();
}
// @Override
// public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
// // 检查HBase表是否为空
// if (table == null) {
// return; // 跳过处理
// }
//
// // 解析输入的字符串
// String[] split = value.split(":");
// // 检查输入格式是否正确
// if (split.length < 4) {
// return;
// }
//
// // 创建HBase的Put操作,用于向表中添加数据
// Put put = new Put(Bytes.toBytes(split[0]));
// put.addColumn(Bytes.toBytes(split[1]), Bytes.toBytes(split[2]), Bytes.toBytes(split[3]));
// // 向HBase表中添加数据
// table.put(put);
// }
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
// 检查HBase表是否为空
if (table == null) {
return; // 跳过处理
}
// 解析输入的字符串
String[] split = value.split(",");
// 检查输入格式是否正确(根据您的数据样本,需要至少37个元素)
if (split.length < 10) {
return;
}
// 使用SEQ作为Row Key创建HBase的Put操作
Put put = new Put(Bytes.toBytes(split[0]));
// 定义列族名称
String columnFamily = "details";
// 将数据的每个部分添加到对应的列
String[] columns = {"order_id", "product_id", "produce_name", "image_url", "purchase_price", "purchase_quantity", "creation_time", "source_type", "source_id"};
for (int i = 0; i < columns.length; i++) {
// 检查数据是否为NULL,如果是,则转换为空字符串
String data = split[i+1].equals("NULL") ? "" : split[i+1];
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columns[i]), Bytes.toBytes(data));
}
// 向HBase表中添加数据
table.put(put);
}
}
Java
flink消费kafka并存到hbase并建立hive外表(json格式)
import com.wz.second.HbaseJson;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.wz.second.Hbasetest;
public class Kafka {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("master:9092,slave1:9092,slave2:9092")
.setGroupId("test2")
.setTopics("test2")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStreamSource<String> kafkaStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
kafkaStream.print();
kafkaStream.process(new HbaseJson());
//stream.map();
env.execute("Kafka");
}
}
Java
package com.wz.second;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
public class HbaseJson extends ProcessFunction<String, String> {
private static final long serialVersionUID = 1L;
private Connection connection = null;
private Table table = null;
@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
// 创建HBase配置
Configuration configuration = HBaseConfiguration.create();
// 加载HBase配置文件
configuration.addResource(new Path("hbase-site.xml"));
configuration.addResource(new Path("core-site.xml"));
// 建立与HBase的连接
connection = ConnectionFactory.createConnection(configuration);
// 指定要操作的HBase表名
TableName tableName = TableName.valueOf("test");
// 获取HBase表对象
table = connection.getTable(tableName);
}
@Override
public void close() throws Exception {
// 关闭表和连接资源
if (null != table) table.close();
if (null != connection) connection.close();
}
// @Override
// public void processElement(String value, ProcessFunction<String, String>.Context context, Collector<String> collector) throws Exception {
// JsonObject jsonObject = new Gson().fromJson(value, JsonObject.class);
//
// String rowKey = jsonObject.get("id").getAsString();
// Put put = new Put(Bytes.toBytes(rowKey));
//
// jsonObject.entrySet().forEach(entry -> {
// String column = entry.getKey();
// String val = entry.getValue().toString();
// put.addColumn(Bytes.toBytes("details"), Bytes.toBytes(column), Bytes.toBytes(val));
// });
//
// table.put(put);
// }
@Override
public void processElement(String value, ProcessFunction<String, String>.Context context, Collector<String> collector) throws Exception {
JsonObject jsonObject = JsonParser.parseString(value).getAsJsonObject();
// 提取 data 部分
JsonObject dataObject = jsonObject.getAsJsonObject("data");
if (dataObject != null) {
String rowKey = dataObject.get("id").getAsString();
Put put = new Put(Bytes.toBytes(rowKey));
dataObject.entrySet().forEach(entry -> {
String column = entry.getKey();
String val = entry.getValue().getAsString();
put.addColumn(Bytes.toBytes("details"), Bytes.toBytes(column), Bytes.toBytes(val));
});
table.put(put);
}
}
}
Java
建立hive外表
CREATE EXTERNAL TABLE hive_order_detail(
primary_key string,
order_id string,
product_id string,
product_name string,
image_url string,
purchase_price string,
purchase_quantity string,
creation_time timestamp,
source_type string,
source_id string)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
'hbase.columns.mapping'=':key,details:order_id,details:product_id,details:produce_name,details:image_url,details:purchase_price,details:purchase_quantity,details:creation_time,details:source_type,details:source_id'
)
TBLPROPERTIES (
'hbase.table.name'='hbase_order_detail');
SQL
flink处理数据并写入redis
package com.wz.third;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;
public class FlinkCostKafkaExec {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
KafkaSource<String> myConsumer = KafkaSource.<String>builder()
.setBootstrapServers("master:9092,slave1:9092,slave2:9092")
.setGroupId("test")
.setTopics("test")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(myConsumer, WatermarkStrategy.noWatermarks(), "Kafka Source");
stream.print();
DataStream<Tuple2<String,Double>> totalamounts =stream.flatMap(new totalAmount())
.map(value -> new Tuple2<>("totallprice",value))
.returns(TypeInformation.of(new TypeHint<Tuple2<String,Double>>(){}));
DataStream<Tuple2<String,Double>> totalamount = totalamounts.keyBy(value -> value.f0)
.reduce((value1,value2) -> new Tuple2<>(value1.f0,value1.f1+value2.f1))
.returns(TypeInformation.of(new TypeHint<Tuple2<String, Double>>() {}));
// DataStream<Tuple2<String,Double>> totalAmounts = kafkaStream.flatMap(new totalAmount())
// .map(value -> new Tuple2<>("totalprice",value))
// .returns(TypeInformation.of(new TypeHint<Tuple2<String, Double>>() {}))
// .keyBy(value -> value.f0)
// .reduce((value1,value2)->new Tuple2<>(value1.f0, value1.f1+value2.f1))
// .returns(TypeInformation.of(new TypeHint<Tuple2<String, Double>>() {
// }));
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
.setHost("master")
.setPort(6379)
.build();
SinkFunction<Tuple2<String,Double>> redisSink = new RedisSink<>(config,new RedisTotalAmountWriter());
totalamount.addSink(redisSink);
env.execute("write");
}
private static class totalAmount implements FlatMapFunction<String,Double> {
@Override
public void flatMap(String s, Collector<Double> collector) throws Exception {
String[] fields = s.split(",\\s");
if (fields.length>5){
try{
String orderStatus = fields[4];
if ("1001".equals(orderStatus)||"1002".equals(orderStatus)||"1004".equals(orderStatus)){
double amount = Double.parseDouble(fields[3]);
collector.collect(amount);
}
}catch (NumberFormatException e){
}
}
}
}
private static class RedisTotalAmountWriter implements RedisMapper<Tuple2<String,Double>>{
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.SET,null);
}
@Override
public String getKeyFromData(Tuple2<String, Double> stringDoubleTuple2) {
return "totalprice";
}
@Override
public String getValueFromData(Tuple2<String, Double> stringDoubleTuple2) {
return String.valueOf(stringDoubleTuple2.f1);
}
}
}
private static long TimeTiqu(String event) {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss");
try{
JsonObject jsonObject = JsonParser.parseString(event).getAsJsonObject();
if (jsonObject.has("data") && !jsonObject.get("data").isJsonNull()) {
JsonObject dataObject = jsonObject.getAsJsonObject("data");
long createtime = Long.MIN_VALUE;
if (dataObject.has("createtime") && !dataObject.get("createtime").isJsonNull()) {
createtime = format.parse(dataObject.get("createtime").getAsString()).getTime();
}
long operatime =Long.MIN_VALUE;
if(dataObject.has("operatiome")&&!dataObject.get("operatime").isJsonNull()){
operatime = format.parse(dataObject.get("opratiome").getAsString()).getTime();
}
return Math.max(createtime,operatime);
}
}catch (Exception e){
e.printStackTrace();
}
return Long.MIN_VALUE;
}
@Override
public void processElement(String value, ProcessFunction<String, String>.Context context, Collector<String> collector) throws Exception {
// 将接收到的 JSON 字符串转换为 JSONObject
JSONObject jsonObject = new JSONObject(value);
// 尝试从 JSONObject 中获取名为 "data" 的子对象
JSONObject dataObject = jsonObject.optJSONObject("data");
// 检查 "data" 子对象是否存在
if (dataObject != null) {
// 从 "data" 对象中获取 "id" 字段的值,并将其用作 rowKey
String rowKey = dataObject.getString("id");
// 创建一个 HBase 的 Put 实例,用于插入或更新数据
Put put = new Put(Bytes.toBytes(rowKey));
// 遍历 "data" 对象的所有键值对
for (String key : dataObject.keySet()) {
// 获取每个字段的值
String val = dataObject.getString(key);
// 将字段添加到 Put 实例中。这里假设所有数据都存储在名为 "details" 的列族中
put.addColumn(Bytes.toBytes("details"), Bytes.toBytes(key), Bytes.toBytes(val));
}
// 将 Put 实例写入 HBase 表
table.put(put);
}
}
Java
求平均值
// 在totalAmount类中
public static final class totalAmount implements FlatMapFunction<String, Tuple2<Double, Integer>> {
@Override
public void flatMap(String s, Collector<Tuple2<Double, Integer>> collector) throws Exception {
String[] fields = s.split(",\\s");
if (fields.length > 5) {
try {
String orderStatus = fields[4];
if ("1001".equals(orderStatus) || "1002".equals(orderStatus) || "1004".equals(orderStatus)) {
Double amount = Double.parseDouble(fields[3]);
collector.collect(new Tuple2<>(amount, 1)); // 收集金额和计数
}
} catch (NumberFormatException e) {
// 异常处理
}
}
}
}
// 在main方法中
DataStream<Tuple2<String, Double>> totalAmounts = kafkaStream
.flatMap(new totalAmount())
.returns(TypeInformation.of(new TypeHint<Tuple2<Double, Integer>>() {}))
.keyBy(value -> "averageprice")
.reduce((value1, value2) -> new Tuple2<>(value1.f0 + value2.f0, value1.f1 + value2.f1))
.returns(TypeInformation.of(new TypeHint<Tuple2<Double, Integer>>() {}))
.map(value -> new Tuple2<>("averageprice", value.f0 / value.f1))
.returns(TypeInformation.of(new TypeHint<Tuple2<String, Double>>() {}));
// RedisSink配置和其他代码保持不变
Swift
求最小值
// 保留对 totalAmount 类的调用
DataStream<Tuple2<String, Double>> minAmounts = kafkaStream
.flatMap(new totalAmount())
.returns(TypeInformation.of(new TypeHint<Tuple2<String, Double>>() {}))
.keyBy(value -> "totalprice") // 固定键,因为我们寻找全局最小值
.reduce((value1, value2) -> value1.f1 < value2.f1 ? value1 : value2)
.returns(TypeInformation.of(new TypeHint<Tuple2<String, Double>>() {}));
// RedisSink配置
SinkFunction<Tuple2<String, Double>> sink = new RedisSink<>(config, new MyRedisMapper());
minAmounts.addSink(sink);
env.execute("write");
public static final class totalAmount implements FlatMapFunction<String, Tuple2<String, Double>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Double>> out) throws Exception {
// 假设 Kafka 流中的记录是以逗号分隔的,并且金额位于固定位置
String[] fields = value.split(","); // 根据实际格式调整分隔符
if (fields.length > YOUR_AMOUNT_FIELD_INDEX) {
try {
Double amount = Double.parseDouble(fields[YOUR_AMOUNT_FIELD_INDEX]); // 替换 YOUR_AMOUNT_FIELD_INDEX 为实际的索引位置
out.collect(new Tuple2<>("minamount", amount));
} catch (NumberFormatException e) {
// 如果解析失败,你可以选择记录日志或者忽略这条记录
}
}
}
}
TypeScript
合并相同consignee字段,并计算合并过的每个consignee的总数
public static final class ParseConsigneeFunction implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
// 解析 Kafka 消息,提取 consignee 字段
// 假设 consignee 是逗号分隔值中的一个字段
String[] fields = value.split(",");
if (fields.length > YOUR_CONSIGNEE_FIELD_INDEX) {
String consignee = fields[YOUR_CONSIGNEE_FIELD_INDEX];
out.collect(new Tuple2<>(consignee, 1));
}
}
}
DataStream<Tuple2<String, Integer>> consigneeCounts = kafkaStream
.flatMap(new ParseConsigneeFunction())
.returns(TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}))
.keyBy(value -> value.f0) // 根据 consignee 字段分组
.reduce((value1, value2) -> new Tuple2<>(value1.f0, value1.f1 + value2.f1))
.returns(TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
SinkFunction<Tuple2<String, Integer>> sink = new RedisSink<>(config, new MyRedisMapper());
consigneeCounts.addSink(sink);
env.execute("write");
TypeScript
计算总数
public static final class ParseConsigneeFunction implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
out.collect(new Tuple2<>("totalConsignees", 1));
}
}
DataStream<Tuple2<String, Integer>> totalConsigneeCount = kafkaStream
.flatMap(new ParseConsigneeFunction())
.returns(TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}))
.keyBy(value -> value.f0) // 使用固定的键 "totalConsignees"
.reduce((value1, value2) -> new Tuple2<>(value1.f0, value1.f1 + value2.f1))
.returns(TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
SinkFunction<Tuple2<String, Integer>> sink = new RedisSink<>(config, new MyRedisMapper());
totalConsigneeCount.addSink(sink);
env.execute("write");
TypeScript
flink处理数据并写入redis(加水印)
package com.wz.third;
// 存入redis 加入水印
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;
import java.text.SimpleDateFormat;
import java.time.Duration;
public class FlinkCostKafka_3 {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
KafkaSource<String> myConsumer = KafkaSource.<String>builder()
.setBootstrapServers("master:9092,slave1:9092,slave2:9092")
.setGroupId("test")
.setTopics("test")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> extractTimestamp(event));
DataStream<String> stream = env.fromSource(myConsumer, watermarkStrategy, "Kafka Source");
stream.print();
DataStream<Tuple2<String,Double>> totalamounts = stream.flatMap(new totalAmount())
.map(value -> new Tuple2<>("totallprice", value))
.returns(TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){}));
DataStream<Tuple2<String,Double>> totalamount = totalamounts.keyBy(value -> value.f0)
.reduce((value1, value2) -> new Tuple2<>(value1.f0, value1.f1 + value2.f1))
.returns(TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){}));
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
.setHost("master")
.setPort(6379)
.build();
SinkFunction<Tuple2<String,Double>> redisSink = new RedisSink<>(config, new RedisTotalAmountWriter());
totalamount.addSink(redisSink);
env.execute("write");
}
private static long extractTimestamp(String event) {
String[] fields = event.split(",\\s");
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
long createTime = fields[10].isEmpty() ? Long.MIN_VALUE : format.parse(fields[10]).getTime();
long operateTime = fields[11].isEmpty() ? Long.MIN_VALUE : format.parse(fields[11]).getTime();
return Math.max(createTime, operateTime);
} catch (Exception e) {
return Long.MIN_VALUE;
}
}
private static class totalAmount implements FlatMapFunction<String, Double> {
@Override
public void flatMap(String s, Collector<Double> collector) throws Exception {
String[] fields = s.split(",\\s");
if (fields.length > 5) {
try {
String orderStatus = fields[4];
if ("1001".equals(orderStatus) || "1002".equals(orderStatus) || "1004".equals(orderStatus)) {
double amount = Double.parseDouble(fields[3]);
collector.collect(amount);
}
} catch (NumberFormatException e) {
// Handle number format exception if needed
}
}
}
}
private static class RedisTotalAmountWriter implements RedisMapper<Tuple2<String, Double>> {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.SET, null);
}
@Override
public String getKeyFromData(Tuple2<String, Double> stringDoubleTuple2) {
return "totalprice";
}
@Override
public String getValueFromData(Tuple2<String, Double> stringDoubleTuple2) {
return String.valueOf(stringDoubleTuple2.f1);
}
}
}
Java
数据展示格式
double number = 1.9786518E7;
String formattedNumber = String.format("%.0f", number);
System.out.println(formattedNumber); // 输出不带小数点的数字
// 或者使用类
import java.text.DecimalFormat;
double number = 1.9786518E7;
DecimalFormat formatter = new DecimalFormat("#");
String formattedNumber = formatter.format(number);
System.out.println(formattedNumber); // 输出不带小数点的数字 # 表示只显示必需的数字
Java
flink处理数据存入redis(使用侧边流处理退回完成的并写入redis)
package com.wz.third;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.text.SimpleDateFormat;
import java.time.Duration;
public class FlinkCostKafka_4 {
final static OutputTag<Tuple2<String, Double>> refundOutputTag = new OutputTag<Tuple2<String, Double>>("refund-output") {};
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
KafkaSource<String> myConsumer = KafkaSource.<String>builder()
.setBootstrapServers("master:9092,slave1:9092,slave2:9092")
.setGroupId("test")
.setTopics("test")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> extractTimestamp(event));
DataStream<String> stream = env.fromSource(myConsumer, watermarkStrategy, "Kafka Source");
SingleOutputStreamOperator<Tuple2<String, Double>> processedStream = stream.process(new ProcessFunction<String, Tuple2<String, Double>>() {
@Override
public void processElement(String value, Context ctx, Collector<Tuple2<String, Double>> out) throws Exception {
String[] fields = value.split(",\\s");
if (fields.length > 5) {
try {
String orderStatus = fields[4];
double amount = Double.parseDouble(fields[3]);
if ("1001".equals(orderStatus) || "1002".equals(orderStatus) || "1004".equals(orderStatus)) {
out.collect(new Tuple2<>("totalprice", amount));
} else if ("1006".equals(orderStatus)) {
ctx.output(refundOutputTag, new Tuple2<>("totalrefundordercount", amount));
}
} catch (NumberFormatException e) {
// Handle number format exception if needed
}
}
}
});
DataStream<Tuple2<String, Double>> refundStream = processedStream.getSideOutput(refundOutputTag);
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
.setHost("master")
.setPort(6379)
.build();
SinkFunction<Tuple2<String, Double>> redisSink = new RedisSink<>(config, new RedisTotalAmountWriter());
processedStream.addSink(redisSink);
refundStream.addSink(redisSink);
env.execute("write");
}
private static long extractTimestamp(String event) {
String[] fields = event.split(",\\s");
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
long createTime = fields[10].isEmpty() ? Long.MIN_VALUE : format.parse(fields[10]).getTime();
long operateTime = fields[11].isEmpty() ? Long.MIN_VALUE : format.parse(fields[11]).getTime();
return Math.max(createTime, operateTime);
} catch (Exception e) {
return Long.MIN_VALUE;
}
}
private static class RedisTotalAmountWriter implements RedisMapper<Tuple2<String, Double>> {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.SET, null);
}
@Override
public String getKeyFromData(Tuple2<String, Double> stringDoubleTuple2) {
return stringDoubleTuple2.f0;
}
@Override
public String getValueFromData(Tuple2<String, Double> stringDoubleTuple2) {
return String.valueOf(stringDoubleTuple2.f1);
}
}
}
Java
flink处理数据存入clickhouse
package com.wz.third;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.sql.PreparedStatement;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.time.Duration;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseDataSource;
import ru.yandex.clickhouse.settings.ClickHouseProperties;
public class FlinkCostKafka_3 {
final static OutputTag<String> cancelOrderOutputTag = new OutputTag<String>("cancel-order-output") {};
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
KafkaSource<String> myConsumer = KafkaSource.<String>builder()
.setBootstrapServers("master:9092,slave1:9092,slave2:9092")
.setGroupId("test")
.setTopics("test")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> extractTimestamp(event));
DataStream<String> stream = env.fromSource(myConsumer, watermarkStrategy, "Kafka Source");
SingleOutputStreamOperator<Tuple2<String, Double>> processedStream = stream.process(new ProcessFunction<String, Tuple2<String, Double>>() {
@Override
public void processElement(String value, Context ctx, Collector<Tuple2<String, Double>> out) throws Exception {
String[] fields = value.split(",\\s");
if (fields.length > 5) {
try {
String orderStatus = fields[4];
double amount = Double.parseDouble(fields[3]);
if ("1001".equals(orderStatus) || "1002".equals(orderStatus) || "1004".equals(orderStatus)) {
out.collect(new Tuple2<>("totalprice", amount));
} else if ("1003".equals(orderStatus)) {
ctx.output(cancelOrderOutputTag, value);
}
} catch (NumberFormatException e) {
// Handle number format exception if needed
}
}
}
});
DataStream<String> cancelOrderStream = processedStream.getSideOutput(cancelOrderOutputTag);
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
.setHost("master")
.setPort(6379)
.build();
SinkFunction<Tuple2<String, Double>> redisSink = new RedisSink<>(config, new RedisTotalAmountWriter());
processedStream.addSink(redisSink);
cancelOrderStream.addSink(new ClickHouseSink());
env.execute("Flink Kafka to Redis and ClickHouse");
}
private static long extractTimestamp(String event) {
String[] fields = event.split(",\\s");
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
long createTime = fields[10].isEmpty() ? Long.MIN_VALUE : format.parse(fields[10]).getTime();
long operateTime = fields[11].isEmpty() ? Long.MIN_VALUE : format.parse(fields[11]).getTime();
return Math.max(createTime, operateTime);
} catch (Exception e) {
return Long.MIN_VALUE;
}
}
private static class totalAmount implements FlatMapFunction<String, Double> {
@Override
public void flatMap(String s, Collector<Double> collector) throws Exception {
String[] fields = s.split(",\\s");
if (fields.length > 5) {
try {
String orderStatus = fields[4];
if ("1001".equals(orderStatus) || "1002".equals(orderStatus) || "1004".equals(orderStatus)) {
double amount = Double.parseDouble(fields[3]);
collector.collect(amount);
}
} catch (NumberFormatException e) {
// Handle number format exception if needed
}
}
}
}
private static class RedisTotalAmountWriter implements RedisMapper<Tuple2<String, Double>> {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.SET, null);
}
@Override
public String getKeyFromData(Tuple2<String, Double> stringDoubleTuple2) {
return "totalprice";
}
@Override
public String getValueFromData(Tuple2<String, Double> stringDoubleTuple2) {
return String.valueOf(stringDoubleTuple2.f1);
}
}
private static class ClickHouseSink extends RichSinkFunction<String> {
private ClickHouseConnection conn;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ClickHouseProperties properties = new ClickHouseProperties();
properties.setUser("your_username");
properties.setPassword("your_password");
ClickHouseDataSource dataSource = new ClickHouseDataSource("jdbc:clickhouse://<your_clickhouse_host>:<port>", properties);
conn = dataSource.getConnection();
}
@Override
public void invoke(String value, Context context) throws Exception {
String[] fields = value.split(",\\s");
// 构建并执行 SQL 语句
String sql = "INSERT INTO order_info (consignee, consignee_tel, final_total_amount, order_status, user_id, delivery_address, order_comment, out_trade_no, trade_body, create_time, operate_time, expire_time, tracking_no, parent_order_id, img_url, province_id, benefit_reduce_amount, original_total_amount, feight_fee) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setString(1, consignee); // varchar
pstmt.setString(2, consigneeTel); // varchar
pstmt.setBigDecimal(3, finalTotalAmount); // decimal
pstmt.setString(4, orderStatus); // varchar
pstmt.setLong(5, userId); // bigint
pstmt.setString(6, deliveryAddress); // varchar
pstmt.setString(7, orderComment); // varchar
pstmt.setString(8, outTradeNo); // varchar
pstmt.setString(9, tradeBody); // varchar
pstmt.setTimestamp(10, createTime); // datetime
pstmt.setTimestamp(11, operateTime); // datetime
pstmt.setTimestamp(12, expireTime); // datetime
pstmt.setString(13, trackingNo); // varchar
pstmt.setLong(14, parentOrderId); // bigint
pstmt.setString(15, imgUrl); // varchar
pstmt.setInt(16, provinceId); // int
pstmt.setBigDecimal(17, benefitReduceAmount); // decimal
pstmt.setBigDecimal(18, originalTotalAmount); // decimal
pstmt.setBigDecimal(19, feightFee); // decimal
pstmt.execute();
}
}
@Override
public void close() throws Exception {
if (conn != null) {
conn.close();
}
super.close();
}
}
}