DataHub基本介绍
阿里云流数据处理平台DataHub是流式数据(Streaming Data)的处理平台,提供对流式数据的发布 (Publish),订阅 (Subscribe)和分发功能,让您可以轻松构建基于流式数据的分析和应用。DataHub服务可以对各种移动设备,应用软件,网站服务,传感器等产生的大量流式数据进行持续不断的采集,存储和处理。用户可以编写应用程序或者使用流计算引擎来处理写入到DataHub的流式数据比如实时web访问日志、应用日志、各种事件等,并产出各种实时的数据处理结果比如实时图表、报警信息、实时统计等。
DataHub服务基于阿里云自研的飞天平台,具有高可用,低延迟,高可扩展,高吞吐的特点。DataHub与阿里云流计算引擎StreamCompute无缝连接,用户可以轻松使用SQL进行流数据分析。
DataHub服务也提供分发流式数据到各种云产品的功能,目前支持分发到MaxCompute(原ODPS),OSS等。
系统整体功能图
高吞吐
最高支持单shard每日8000万Record级别的写入量。
实时性
通过 DataHub ,您可以实时的收集各种方式生成的数据并进行实时的处理,对您的业务产生快速的响应。
易用性
DataHub 提供丰富的SDK包,包括C++, JAVA, Pyhon, Ruby, Go等语言。
DataHub服务也提供Restful API规范,您可以用自己的方式实现访问接口。
除了SDK以外,DataHub 还提供一些常用的客户端插件,包括:Fluentd,LogStash,Flume等。您可以使用这些客户端工具往 DataHub 里面写入流式数据。
DataHub 同时支持强Schema的结构化数据(创建Tuple类型的Topic)和无类型的非结构化数据(创建Blob类型的Topic),您可以自由选择。
高可用
服务可用性不低于99.9%。
规模自动扩展,不影响对外服务;数据持久性不低于99.999%。
数据自动多重冗余备份。
动态伸缩
每个主题(Topic)的数据流吞吐能力可以动态扩展和减少,最高可达到每主题256000 Records/s的吞吐量。
高安全性
提供企业级多层次安全防护,多用户资源隔离机制;
提供多种鉴权和授权机制及白名单、主子账号功能。
DataHub作为一个流式数据处理服务,结合阿里云众多云产品,可以构建一站式的数据处理服务。
流计算StreamCompute
StreamCompute是阿里云提供的流计算引擎,提供使用类SQL的语言来进行流式计算。DataHub 和StreamCompute无缝结合,可以作为StreamCompute的数据源和输出源,具体可参考实时计算文档
流处理应用
用户可以编写应用订阅DataHub中的数据,并进行实时的加工,把加工后的结果输出。用户可以把应用计算产生的结果输出到DataHub中,并使用另外一个应用来处理上一个应用生成的流式数据,来构建数据处理流程的DAG。
流式数据归档
用户的流式数据可以归档到 MaxCompute(原ODPS)中。用户通过创建DataHub Connector,指定相关配置,即可创建将Datahub中流式数据定期归档的同步任务。
以上来自于阿里云平台 Datahub在线文档
好了,先进入正题,Datahub 2.15版本是支持协同消费的,单是要保证阿里云数据总线服务器是最新版本的,否则是不支持协同消费类的,但是低版本数据总线服务器是支持高版本sdk向下兼容的
一开始我们新建一个项目
项目新建完成以后,再新建这个项目下的topic
pom依赖
<dependency>
<groupId>com.aliyun.datahubgroupId>
<artifactId>aliyun-sdk-datahubartifactId>
<version>2.15.0-publicversion>
dependency>
<dependency>
<groupId>com.aliyun.datahubgroupId>
<artifactId>datahub-client-libraryartifactId>
<version>1.1.8-publicversion>
dependency>
# datahub配置文件
public:
datahub:
# 配置信息
config:
#控制器
startup: false
#应用服务器地址
endpoint: XXXXXXXXXXXXXXXXXXXXXXXXXXXX
#用户权限accessId
accessId: XXXXXXXXXXXXXXXXX
#用户权限accessKey
accessKey: XXXXXXXXXXXXXXXx
serivce:
#应用名称
- projectName: XXXXXXXX
#应用消费——topic名称
topicGet: XXXXXXXXXXXXXXXXXX
#应用生产——topic名称
topicSet: XXXXXXXXXXXXXXXXXX
#topicid的订阅ID
subId: XXXXXXXXXXXXXXXXXX
package com.encdata.oss.datahubClient;
import com.encdata.oss.system.domain.dto.DataHubConfigDTO;
import com.encdata.oss.system.handler.CustomerThreadPool;
import com.encdata.oss.system.handler.task.DatahubConsumerTask;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static com.encdata.oss.system.controller.BaseController.executorService;
/**
* Created by IntelliJ IDEA.
* datahub 配置文件类
* @author liyiq
* @date 2020/06/04
*/
@Data
@Component
@ConfigurationProperties(prefix = "public.datahub.config")
public class DatahubConfig {
/**
* datahub 控制器
*/
private Boolean startup;
/**
* datahub应用服务器地址
*/
private String endpoint;
public static String endpoints;
/**
* datahub 用户权限accessId
*/
private String accessId;
public static String accessIds;
/**
* datahub 用户权限accessKey
*/
private String accessKey;
public static String accessKeys;
/**
* 应用配置信息集合
*/
private List<Map> serivce;
public static List<Map> serivces = new ArrayList<>();
/**
* datahub 应用名称
*/
private String projectName;
/**
* datahub 应用消费——topic名称
*/
private String topicGet;
/**
* datahub topicid的订阅ID
*/
private String subId;
/**
* 初始化datahub协同消费类
* 支持订阅多个topic
*/
@PostConstruct
public void datahubRecycling(){
//todo: 静态变量赋值
endpoints = this.endpoint;
accessIds = this.accessId;
accessKeys = this.accessKey;
serivces = this.serivce;
//todo:开启关闭datahub消费控制器
if(startup){
if(executorService == null){
executorService = CustomerThreadPool.createDefaultThreadPool();
}
//todo: 多应用分割
for(int s=0;s<serivce.size();s++){
Map map = serivce.get(s);
projectName = map.get("projectName").toString();
topicGet = map.get("topicGet").toString();
subId = map.get("subId").toString();
String[] topics = topicGet.split(",");
//todo: 多topic循环监听
for(int i=0;i<topics.length;i++){
//todo:封装datahub配置DTO
DataHubConfigDTO dataHubConfigDTO = new DataHubConfigDTO();
dataHubConfigDTO
.setEndpoint(endpoint)
.setAccessId(accessId)
.setAccessKey(accessKey)
.setProjectName(projectName)
.setTopicName(topics[i])
.setSubId(subId);
//todo:初始化监听线程
executorService.execute(new DatahubConsumerTask(dataHubConfigDTO));
}
}
}
}
}
package com.encdata.oss.system.handler;
import java.util.concurrent.*;
/**
* 自定义线程池
*
* Created by IntelliJ IDEA.
*
* @author yangyi
* @date 2020/06/04
*/
public class CustomerThreadPool {
/** 线程池核心线程数,即线程池中常驻的线程数量 **/
private static final int DEFAULT_CORE_POLL_SIZE = 8;
/** 线程池允许的最大线程数,非核心线程在超时之后会被清除,受限于 CAPACITY,需要根据实际的物理机配置去计算 **/
private static final int DEFAULT_MAXIMUM_POOL_SIZE = 1024;
/** 线程没有任务执行时可以保持的时间【非核心线程】 **/
private static final long DEFAULT_KEEP_ALIVE_TIME = 0;
/** keepAliveTime 的时间单位 **/
private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MICROSECONDS;
/** 阻塞队列的最大容量 **/
private static final Integer MAX_WORK_QUEUE_CAPACITY = 1024*10;
/** 任务阻塞队列,用于存储等待执行的任务,默认采用有界队列 **/
private static final BlockingQueue<Runnable> DEFAULT_WORK_QUEUE = new ArrayBlockingQueue<Runnable>(MAX_WORK_QUEUE_CAPACITY);
/** 线程工厂,用来创建线程,可自定义对线程的控制**/
private static final ThreadFactory DEFAULT_THREAD_FACTORY = Executors.defaultThreadFactory();
/**
* rejectHandler:当任务队列已满时,拒绝任务提交时的策略:
AbortPolicy【默认】:丢掉任务,并抛RejectedExecutionException异常。
DiscardPolicy:直接丢掉任务,不抛异常。
DiscardOldestPolicy:丢掉最老的任务,然后调用execute立刻执行该任务(新进来的任务)。
CallerRunsPolicy【推荐】:在调用者的当前线程去执行这个任务。
*/
private static final RejectedExecutionHandler DEFAULT_HANDLER = new ThreadPoolExecutor.CallerRunsPolicy();
/**
* 创建线程池
* @param corePoolSize 核心线程数
* @param maximumPoolSize 最大线程数
* @param keepAliveTime 没有任务执行时非核心线程可以保持的时间
* @param unit keepAliveTime 的时间单位
* @param workQueue 任务阻塞队列,用于存储等待执行的任务
* @param threadFactory 线程工厂
* @param handler 当任务队列已满时,拒绝任务提交时的策略
* @return ThreadPoolExecutor
*/
public static ThreadPoolExecutor createThreadPool(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, handler);
}
/***
* 创建线程池
* @param corePoolSize 核心线程数
* @param maximumPoolSize 最大线程数
* @param keepAliveTime 没有任务执行时非核心线程可以保持的时间
* @param unit keepAliveTime 的时间单位
* @param workQueue 任务阻塞队列,用于存储等待执行的任务
* @param threadFactory 线程工厂
* @return ThreadPoolExecutor
*/
public static ThreadPoolExecutor createThreadPool (int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
return createThreadPool(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, DEFAULT_HANDLER);
}
/***
* 创建线程池
* @param corePoolSize 核心线程数
* @param maximumPoolSize 最大线程数
* @param keepAliveTime 没有任务执行时非核心线程可以保持的时间
* @param unit keepAliveTime 的时间单位
* @param workQueue 任务阻塞队列,用于存储等待执行的任务
* @return ThreadPoolExecutor
*/
public static ThreadPoolExecutor createThreadPool(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
return createThreadPool(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, DEFAULT_THREAD_FACTORY);
}
/***
* 创建线程池
* @param corePoolSize 核心线程数
* @param maximumPoolSize 最大线程数
* @param keepAliveTime 没有任务执行时非核心线程可以保持的时间
* @param unit keepAliveTime 的时间单位
* @return ThreadPoolExecutor
*/
public static ThreadPoolExecutor createThreadPool(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit) {
return createThreadPool(corePoolSize, maximumPoolSize, keepAliveTime, unit, DEFAULT_WORK_QUEUE);
}
/**
* 创建默认的线程池,配置如下:
* corePoolSize = 8;
* maximumPoolSize = 1024;
* keepAliveTime = 0;
* unit = TimeUnit.MICROSECONDS
* workQueue = new ArrayBlockingQueue(10240);
* threadFactory = Executors.defaultThreadFactory();
* handler = new ThreadPoolExecutor.CallerRunsPolicy();
*/
public static ThreadPoolExecutor createDefaultThreadPool() {
return createThreadPool(DEFAULT_CORE_POLL_SIZE, DEFAULT_MAXIMUM_POOL_SIZE, DEFAULT_KEEP_ALIVE_TIME, DEFAULT_TIME_UNIT);
}
}
package com.encdata.oss.system.handler.task; import com.encdata.oss.system.exception.BaseException; /** * 抽象工作线程,若想进行多线程工作,可以继承此抽象类,再重写相关方法实现具体的业务逻辑 *
* Created by IntelliJ IDEA. * * @author yangyi * @date 2020/06/04 */ public abstract class BaseWorkTask implements Runnable { //工作任务线程 /*@Override public void run() { // 工作示例 System.out.println(Thread.currentThread().getId() + " is start"); process(); System.out.println(Thread.currentThread().getId() + " is over"); }*/ /** * 业务处理方法 * * @throws BaseException 业务处理异常 */ protected abstract void process() throws BaseException; }
package com.encdata.oss.system.handler.task;
import com.encdata.oss.datahubClient.singleSubscription.SubscriptionExample;
import com.encdata.oss.system.domain.dto.DataHubConfigDTO;
import com.encdata.oss.system.exception.BaseException;
import lombok.extern.slf4j.Slf4j;
import static com.encdata.oss.system.controller.BaseController.executorService;
/**
* Created by IntelliJ IDEA.
* 多应用,多topic 数据订阅线程
* @author liyiq
* @date 2020/06/08
*/
@Slf4j
public class DatahubConsumerTask extends BaseWorkTask {
/**
* datahub配置类dto
*/
private DataHubConfigDTO dataHubConfigDTO;
/**
* datahub消费类
*/
private SubscriptionExample subscriptionExample;
public DatahubConsumerTask(DataHubConfigDTO dataHubConfigDTO){
this.dataHubConfigDTO = dataHubConfigDTO;
}
@Override
public void run() {
if (log.isDebugEnabled()) {
log.debug("DataHub数据消费信息处理线程【开始】");
}
process();
if (log.isDebugEnabled()) {
log.debug("DataHub数据消费信息处理线程【结束】=====重新请求");
executorService.execute(new DatahubConsumerTask(dataHubConfigDTO));
}
}
/**
* 创建数据消费线程
* @throws BaseException
*/
@Override
protected void process() throws BaseException {
new SubscriptionExample(dataHubConfigDTO).Start();
}
}
package com.encdata.oss.datahubClient.singleSubscription;
import com.aliyun.datahub.DatahubClient;
import com.aliyun.datahub.DatahubConfiguration;
import com.aliyun.datahub.auth.AliyunAccount;
import com.aliyun.datahub.common.data.RecordSchema;
import com.aliyun.datahub.exception.DatahubClientException;
import com.aliyun.datahub.exception.OffsetResetedException;
import com.aliyun.datahub.exception.OffsetSessionChangedException;
import com.aliyun.datahub.exception.SubscriptionOfflineException;
import com.aliyun.datahub.model.*;
import com.encdata.oss.datahubClient.TaskReleaseConsumer;
import com.encdata.oss.system.domain.dto.DataHubConfigDTO;
import java.util.List;
class Consumer{
private String projectName = null;
private String topicName = null;
private String subId = null;
private String shardId = null;
private RecordSchema schema = null;
private DatahubClient client = null;
public Consumer(String projectName, String topicName, String subId, String shardId, RecordSchema schema,
DatahubConfiguration conf) {
this.projectName = projectName;
this.topicName = topicName;
this.subId = subId;
this.shardId = shardId;
this.schema = schema;
this.client = new DatahubClient(conf);
}
private void commit(OffsetContext offsetCtx) {
client.commitOffset(offsetCtx);
//System.out.println("commit offset suc! offset context: " + offsetCtx.toObjectNode().toString());
}
public void run() {
try {
boolean bExit = false;
// 首先初始化offset上下文
OffsetContext offsetCtx = client.initOffsetContext(projectName, topicName, subId, shardId);
String cursor = null; // 开始消费的cursor
if (!offsetCtx.hasOffset()) {
// 之前没有存储过点位,先获取初始点位,比如这里获取当前该shard最早的数据
GetCursorResult cursorResult = client.getCursor(projectName, topicName, shardId, GetCursorRequest.CursorType.OLDEST);
cursor = cursorResult.getCursor();
} else {
// 否则,获取当前已消费点位的下一个cursor
GetCursorResult cursorResult = client.getCursor(projectName, topicName, shardId, GetCursorRequest.CursorType.SEQUENCE,
(offsetCtx.getOffset().getSequence() + 1));
cursor = cursorResult.getCursor();
}
/*System.out.println("Start consume shard:" + shardId + ", start offset:" + offsetCtx.toObjectNode().toString()
+ ", cursor:" + cursor);*/
long recordNum = 0L;
while (!bExit) {
try {
GetRecordsResult recordResult = client.getRecords(projectName, topicName, shardId, cursor, 10,
schema);
List<RecordEntry> records = recordResult.getRecords();
if (records.size() == 0) {
// 将最后一次消费点位上报
commit(offsetCtx);
// 可以先休眠一会,再继续消费新记录
Thread.sleep(1000);
//System.out.println("sleep 1s and continue consume records! shard id:" + shardId);
} else {
for (RecordEntry record : records) {
// 处理记录逻辑
/*System.out.println("Consume shard:" + shardId + " thread process record:"
+ record.toJsonNode().toString());*/
new TaskReleaseConsumer(topicName,record).DataParsing();
// 上报点位,该示例是每处理100条记录上报一次点位
offsetCtx.setOffset(record.getOffset());
recordNum++;
if (recordNum % 100 == 0) {
commit(offsetCtx);
}
}
cursor = recordResult.getNextCursor();
}
} catch (SubscriptionOfflineException e) {
// 订阅下线,退出
bExit = true;
e.printStackTrace();
} catch (OffsetResetedException e) {
// 点位被重置,更新offset上下文
client.updateOffsetContext(offsetCtx);
cursor = client.getCursor(projectName, topicName, shardId,
GetCursorRequest.CursorType.SEQUENCE, (offsetCtx.getOffset().getSequence() + 1)).getCursor();
System.err.println("Restart consume shard:" + shardId + ", reset offset:"
+ offsetCtx.toObjectNode().toString() + ", cursor:" + cursor);
} catch (OffsetSessionChangedException e) {
// 其他consumer同时消费了该订阅下的相同shard,退出
bExit = true;
e.printStackTrace();
} catch (Exception e) {
bExit = true;
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class SubscriptionExample {
private String endpoint;
private String accessId;
private String accessKey;
private String projectName;
private String topicName;
private String subId;
private DatahubConfiguration conf;
private DatahubClient client;
public SubscriptionExample(DataHubConfigDTO dataHubConfigDTO) {
this.endpoint=dataHubConfigDTO.getEndpoint();
this.accessId=dataHubConfigDTO.getAccessId();
this.accessKey=dataHubConfigDTO.getAccessKey();
this.projectName=dataHubConfigDTO.getProjectName();
this.topicName=dataHubConfigDTO.getTopicName();
this.subId=dataHubConfigDTO.getSubId();
this.conf = new DatahubConfiguration(new AliyunAccount(accessId, accessKey), endpoint);
this.client = new DatahubClient(conf);
}
public void Start() {
GetTopicResult topicResult = client.getTopic(projectName, topicName);
ListShardResult shardResult = client.listShard(projectName, topicName);
for (int i = 0; i < shardResult.getShards().size(); ++i) {
new Consumer(projectName, topicName, subId, shardResult.getShards().get(i).getShardId(),
topicResult.getRecordSchema(), conf).run();
}
}
public static void main(String[] args) {
String endpoint = "";
String accessId = "";
String accessKey = "";
String projectName = "";
String topicName = "";
String subId = "";
DataHubConfigDTO dataHubConfigDTO = new DataHubConfigDTO();
dataHubConfigDTO
.setEndpoint(endpoint)
.setAccessId(accessId)
.setAccessKey(accessKey)
.setProjectName(projectName)
.setTopicName(topicName)
.setSubId(subId);
SubscriptionExample example = new SubscriptionExample(dataHubConfigDTO);
try {
example.Start();
} catch (DatahubClientException e) {
e.printStackTrace();
}
}
}
package com.encdata.oss.datahubClient;
import com.aliyun.datahub.common.data.Field;
import com.aliyun.datahub.common.data.FieldType;
import com.aliyun.datahub.common.data.RecordSchema;
import com.aliyun.datahub.exception.DatahubClientException;
import com.encdata.oss.datahubClient.singleSubscription.DatahubExample;
import com.encdata.oss.system.domain.dto.DataHubConfigDTO;
import com.encdata.oss.system.domain.dto.TaskCenterPRQDTO;
import com.encdata.oss.util.ObjectParseUtils;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import static com.encdata.oss.datahubClient.DatahubConfig.*;
/**
* Created by IntelliJ IDEA.
* 任务发布状态回调类
* @author liyiq
* @date 2020/06/08
*/
@Slf4j
public class TaskReleaseProduction<T>{
private static String projectName;
private static String topicSet;
private static DataHubConfigDTO dataHubConfigDTO;
/**
* 状态回调成dto
*/
private T dto;
public TaskReleaseProduction(T dto){
this.dto=dto;
}
/**
* 向Datahub发送数据
*/
public void DataProduction(String topic){
//todo: 通过不同的应用获取不同的topic
for (int s = 0; s < serivces.size(); s++) {
Map map = serivces.get(s);
projectName = map.get("projectName").toString();
topicSet = map.get("topicSet").toString();
dataHubConfigDTO = new DataHubConfigDTO();
this.dataHubConfigDTO
.setEndpoint(endpoints)
.setAccessId(accessIds)
.setAccessKey(accessKeys)
.setProjectName(projectName)
.setTopicName(topicSet);
if (topic.equals(topicSet)) {
datahubWriter(topic);
}
}
}
public void datahubWriter(String topic){
// Endpoint以Region: 华东1为例,其他Region请按实际情况填写
RecordSchema schema = new RecordSchema();
Map map = ObjectParseUtils.objectToMap(dto);
for(Object key : map.keySet()){
schema.addField(new Field(key.toString(), FieldType.STRING));
}
//todo: 根据泛型dto定义schema
// switch (topic){
// case "task_center_platform_response":
// schema.addField(new Field("ref_id", FieldType.STRING));
// schema.addField(new Field("biz_sys_code", FieldType.STRING));
// schema.addField(new Field("biz_sys_name", FieldType.STRING));
// schema.addField(new Field("title", FieldType.STRING));
// schema.addField(new Field("result_code", FieldType.STRING));
// schema.addField(new Field("result_msg", FieldType.STRING));
// break;
// }
//todo:单独写入数据方法
DatahubExample example = new DatahubExample(dataHubConfigDTO);
try {
example.init(schema);
example.putRecords(dto);
//example.getRecords();
} catch (DatahubClientException e) {
e.printStackTrace();
}
//todo:协同消费写入数据方法
// ProducerConfig config = new ProducerConfig(endpoints, accessIds, accessKeys);
// Producer producer = new Producer(projectName, topicSet, config);
// try {
// List recordEntries = genRecords(schema);
// DatahubWriter.sendRecords(producer, recordEntries);
// } finally {
// // 确保资源正确释放
// producer.close();
// }
}
/*private List genRecords(RecordSchema schema) {
List recordEntries = new ArrayList<>();
RecordEntry entry = new RecordEntry();
//entry.addAttribute("key1", "value1");
//entry.addAttribute("key2", "value2");
TupleRecordData data = new TupleRecordData(schema);
//todo: 通过key值定义同步dto数据到data
Map map = ObjectParseUtils.objectToMap(dto);
for (Object key : map.keySet()) {
data.setField(key.toString(),map.get(key));
}
entry.setRecordData(data);
recordEntries.add(entry);
return recordEntries;
}*/
/*public static void main(String[] args) {
// Endpoint以Region: 华东1为例,其他Region请按实际情况填写
String endpoint = "https://datahub.cn-shanghai-shga-d01.dh.alicloud.ga.sh";
String accessId = "0iWV0NCs805VuAAu";
String accessKey = "iEwlgpCnXDwT93YMVDb2G60my9ne81";
String projectName = "sjc_rwzx";
String topicName = "task_center_platform_request";
RecordSchema schema = new RecordSchema();
*//*TaskCenterPRPDTO dto = new TaskCenterPRPDTO();
dto.setBiz_sys_code("")
.setBiz_sys_name("")
.setRef_id("")
.setResult_code("")
.setResult_msg("")
.setTitle("");*//*
TaskCenterPRQDTO dto = new TaskCenterPRQDTO();
dto.setBiz_sys_code("test")
.setBiz_sys_name("测试系统")
.setBrief("测试任务")
.setClose_time("")
.setFlow_start_param("")
.setRef_id("123")
.setRemark("备注")
.setSubmit_by("张三")
.setTc_flow_id("")
.setTitle("测试标题");
Map map = ObjectParseUtils.objectToMap(dto);
// for(Object key : map.keySet()){
// schema.addField(new Field(key.toString(), FieldType.STRING));
// }
schema.addField(new Field("ref_id", FieldType.STRING));
schema.addField(new Field("biz_sys_code", FieldType.STRING));
schema.addField(new Field("biz_sys_name", FieldType.STRING));
schema.addField(new Field("title", FieldType.STRING));
schema.addField(new Field("brief", FieldType.STRING));
schema.addField(new Field("submit_by", FieldType.STRING));
schema.addField(new Field("close_time", FieldType.STRING));
schema.addField(new Field("remark", FieldType.STRING));
schema.addField(new Field("tc_flow_id", FieldType.STRING));
schema.addField(new Field("flow_start_param", FieldType.STRING));
ProducerConfig config = new ProducerConfig(endpoint, accessId, accessKey);
Producer producer = new Producer(projectName, topicName, config);
// 根据场景控制循环
boolean stop = false;
try {
while (!stop) {
List recordEntries = new ArrayList<>();
RecordEntry entry = new RecordEntry();
//entry.addAttribute("key1", "value1");
//entry.addAttribute("key2", "value2");
TupleRecordData data = new TupleRecordData(schema);
//todo: 通过key值定义同步dto数据到data
Map map1 = ObjectParseUtils.objectToMap(dto);
for (Map.Entry entry1: map1.entrySet()) {
data.setField(entry1.getKey(),entry1.getValue());
}
entry.setRecordData(data);
recordEntries.add(entry);
DatahubWriter.sendRecords(producer, recordEntries);
}
} finally {
// 确保资源正确释放
producer.close();
}
}*/
public static void datahubSetmessage(TaskCenterPRQDTO dto) {
String endpoint = "";
String accessId = "";
String accessKey = "";
String projectName = "";
String topicName = "";
DataHubConfigDTO dataHubConfigDTO = new DataHubConfigDTO();
dataHubConfigDTO
.setEndpoint(endpoint)
.setAccessId(accessId)
.setAccessKey(accessKey)
.setProjectName(projectName)
.setTopicName(topicName);
DatahubExample example = new DatahubExample(dataHubConfigDTO);
RecordSchema schema = new RecordSchema();
// schema.addField(new Field("ref_id", FieldType.STRING));
// schema.addField(new Field("biz_sys_code", FieldType.STRING));
// schema.addField(new Field("biz_sys_name", FieldType.STRING));
// schema.addField(new Field("title", FieldType.STRING));
// schema.addField(new Field("brief", FieldType.STRING));
// schema.addField(new Field("submit_by", FieldType.STRING));
// schema.addField(new Field("close_time", FieldType.STRING));
// schema.addField(new Field("remark", FieldType.STRING));
// schema.addField(new Field("tc_flow_id", FieldType.STRING));
// schema.addField(new Field("flow_start_param", FieldType.STRING));
Map map = ObjectParseUtils.objectToMap(dto);
for(Object key : map.keySet()){
schema.addField(new Field(key.toString(), FieldType.STRING));
}
try {
example.init(schema);
example.putRecords(dto);
//example.getRecords();
//example.createADSDataConnector();
} catch (DatahubClientException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
String endpoint = "";
String accessId = "";
String accessKey = "";
String projectName = "";
String topicName = "";
DataHubConfigDTO dataHubConfigDTO = new DataHubConfigDTO();
dataHubConfigDTO
.setEndpoint(endpoint)
.setAccessId(accessId)
.setAccessKey(accessKey)
.setProjectName(projectName)
.setTopicName(topicName);
DatahubExample example = new DatahubExample(dataHubConfigDTO);
RecordSchema schema = new RecordSchema();
// schema.addField(new Field("ref_id", FieldType.STRING));
// schema.addField(new Field("biz_sys_code", FieldType.STRING));
// schema.addField(new Field("biz_sys_name", FieldType.STRING));
// schema.addField(new Field("title", FieldType.STRING));
// schema.addField(new Field("brief", FieldType.STRING));
// schema.addField(new Field("submit_by", FieldType.STRING));
// schema.addField(new Field("close_time", FieldType.STRING));
// schema.addField(new Field("remark", FieldType.STRING));
// schema.addField(new Field("tc_flow_id", FieldType.STRING));
// schema.addField(new Field("flow_start_param", FieldType.STRING));
TaskCenterPRQDTO dto = new TaskCenterPRQDTO();
dto.setBiz_sys_code("sys_task_center")
.setBiz_sys_name("任务中心")
.setBrief("任务发起测试")
.setClose_time("2020-06-21 00:00:00")
.setFlow_start_param("")
.setRef_id("1")
.setRemark("备注")
.setSubmit_by("test")
.setTc_flow_id("1")
.setTitle("测试标题");
Map map = ObjectParseUtils.objectToMap(dto);
for(Object key : map.keySet()){
schema.addField(new Field(key.toString(), FieldType.STRING));
}
try {
example.init(schema);
example.putRecords(dto);
//example.getRecords();
//example.createADSDataConnector();
} catch (DatahubClientException e) {
e.printStackTrace();
}
}
}