文章目录
- 一、背景
- 二、功能模块划分
- 1. 作业台
-
- 2.任务列表
-
- 3.项目管理
- 4.模板管理
- 5.UDF管理
- 三、问题解决
- 1. kerberos认证问题
- 2.分布式锁解决Job名称冲突问题
- 3.自定义线程池用以监控线程运行情况
- 4. 待补充(TODO)
- 其他
- 1. 大量的运用了BeanUtils.copyBean,why?
- 2. SpringBoot结合Mybatis-plus自动回填获取ID
- 3. 使用RequestContextHolder获取session会话请求入参来校验登录
- 总结
一、背景
实时业务刚起步的时候,各个部门无论是使用sparkstreaming还是使用flink做实时任务开发基本上处于最原始的命令行方式提交,然后靠人力去运维和监控。这就造成了N多的项目比如代码维护、监控、通用配置等无法有效管理,造成实时任务开发和运维成本高涨,与此同时我们希望能尽快先解决一个最核心功能类解决任务提交jar/sql实时作业的问题,然后再通用的平台上实现对实时任务的运维管理和简单监控等。以下记录了从0到1各个模块实现的关键点如下:
二、功能模块划分
1. 作业台
主要功能
1、创建Jar任务 2、创建SQL类型任务
任务生命周期
- 创建(通过任务名称、任务类型等参数新增job并为新增的job生成一个初始版本并返回此job的ID(应用递增))
- 保存(save) (保存涉及了上一个步骤的 JobMeta以及提交的资源文件和配置信息三大部分) 用来更新job
- 发布(job release) 自定义接口FlinkClientService 封装flink run运行命令使用线程池异步提交job
- 取消 (job cancel) 使用flink stop命令:
flink stop -m yarn-cluster -yid application_1584340574618_0841 -p hdfs:///flink/checkpoints/ 550d2fd38b874db23629e0525c62fc2a
- 上下线 (设置job标记位)
2.任务列表
主要功能
1、查看JOB列表,可以依据不同的查询条件检索
2、启动(指定savepoint)查看日志、复制任务,取消任务、状态修改等
3.项目管理
该模块以项目为管理方式进行收口,一个项目下可以包含多个实时任务,如没有项目默认已登录者姓名为项目名称,主要是为了方便管理。
4.模板管理
对于固定的模板比如sql样例、或者固定的jar只需修改的参数的任务,可以直接基于模板创建,简单方便快捷。
5.UDF管理
在工作台页面内置常用共有函数,在sql开发页面只需要将对应的函数名拖到sql编辑页就能自动完成导入。如:
CREATE FUNCTION StringLengthUdf AS 'com.test.udx.StringLengthUdf';
同时支持用户创建个人私有函数, 完成发布后方可使用。
三、问题解决
1. kerberos认证问题
由于我们的任务是运行在yarn上,而Hadoop集群又是基于Kerberos进行认证的,所以我们在用户提交jar的时候就封装了认证文件,如下所示:
command = "export HADOOP_CLASSPATH=`hadoop classpath` && kinit -kt /home/deploy/kerberos/app_prd.keytab app_prd && " + command;
try {
Process process = new ProcessBuilder("sh", "-c", command).redirectErrorStream(true).start();
StringBuilder builder = new StringBuilder();
try (BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
String line;
while ((line = br.readLine()) != null) {
if (lineFilter.test(line)) {
builder.append(line).append("\n");
}
}
}
String runLog = builder.toString();
return runLog;
启动日志显示认证成功:
2021-04-04 15:06:18,861 INFO org.apache.hadoop.security.UserGroupInformation - Login successful for user app_prd@FAYSON.COM using keytab file nf.
2.分布式锁解决Job名称冲突问题
在initJob的时候,这里基于Redis作为分布式来解决并发修改同名job的问题,如下:
String lockKey = metaDTO.getName();
String clientId = GlobalConstant.REDIS_KEY_PREFFIX + System.nanoTime();
try {
if (redisUtils.tryLock(lockKey, clientId, GlobalConstant.REDIS_LOCK_TIME)) {
this.updateById(metaDO);
jobResourceFileService.addFile(file, jobId, fullJobInfoDTO.getResourceFileDTO());
RealtimeJobResourceConfigDTO resourceConfigDTO = fullJobInfoDTO.getResourceConfigDTO();
jobResourceConfigService.add(resourceConfigDTO, jobId);
RealtimeJobMetaDTO currentMetaDTO = this.getByJobId(jobId);
jobMetaHistoryService.add(currentMetaDTO);
jobResourceFileHistoryService.add(fullJobInfoDTO.getResourceFileDTO(), currentMetaDTO.getVersionId());
jobResourceConfigHistoryService.add(resourceConfigDTO, currentMetaDTO.getVersionId());
}
} finally {
redisUtils.releaseLock(lockKey, clientId);
}
在这里主要是借助于RedisUtils工具类中获取锁和释放锁,代码逻辑:
private static final String RELEASE_LOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
public Boolean tryLock(String lockKey, String clientId, long seconds) {
return stringRedisTemplate.execute((RedisCallback<Boolean>) redisConnection -> {
Jedis jedis = (Jedis) redisConnection.getNativeConnection();
String result = jedis.set(lockKey, clientId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, seconds);
if (LOCK_SUCCESS.equals(result)) {
return Boolean.TRUE;
}
return Boolean.FALSE;
});
}
public Boolean releaseLock(String lockKey, String clientId) {
return stringRedisTemplate.execute((RedisCallback<Boolean>) redisConnection -> {
Jedis jedis = (Jedis) redisConnection.getNativeConnection();
Object result = jedis.eval(RELEASE_LOCK_SCRIPT, Collections.singletonList(lockKey),
Collections.singletonList(clientId));
if (RELEASE_SUCCESS.equals(result)) {
return Boolean.TRUE;
}
return Boolean.FALSE;
});
}
至于为什么释放锁需要调用脚本的方式进行参考如下:
那么删除锁的正确姿势之一,就是可以使用 Lua 脚本,通过 Redis 的 eval/evalsha 命令来运行:
-- lua删除锁:
-- KEYS和ARGV分别是以集合方式传入的参数,对应上文的Test和uuid。
-- 如果对应的value等于传入的uuid。
if redis.call('get', KEYS[1]) == ARGV[1]
then
-- 执行删除操作
return redis.call('del', KEYS[1])
else
-- 不成功,返回0
return 0
end
这里使用了利用Redis执行 Lua的原子性来达到数据一致性,具体参考https://blog.csdn.net/weixin_39540271/article/details/112229979
3.自定义线程池用以监控线程运行情况
这里我们只需要继承spring框架提供的ThreadPoolTaskExecutor,其实它也是实现了Executor框架,可以进行异步提交任务。
以下时该类的继承关系:
ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor
public interface SchedulingTaskExecutor extends AsyncTaskExecutor
public interface AsyncTaskExecutor extends TaskExecutor
public interface TaskExecutor extends Executor {
public interface Executor {
void execute(Runnable command);
}
暂且在这里做下继承实现简单的日志打印动作,如下:
public class RealJobThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
@Value("${xx.thread.pool.queue.capacity}")
private int queueCapacity;
private static double QUEUE_THRESHOLD = 0.8;
private static final Logger logger = LoggerFactory.getLogger(RealJobThreadPoolTaskExecutor.class);
private void showThreadPoolInfo(String prefix) {
ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
if (null == threadPoolExecutor) {
return;
}
if (threadPoolExecutor.getQueue().size() > queueCapacity * QUEUE_THRESHOLD) {
logger.error("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}] is near overflow",
this.getThreadNamePrefix(),
prefix,
threadPoolExecutor.getTaskCount(),
threadPoolExecutor.getCompletedTaskCount(),
threadPoolExecutor.getActiveCount(),
threadPoolExecutor.getQueue().size());
} else {
logger.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
this.getThreadNamePrefix(),
prefix,
threadPoolExecutor.getTaskCount(),
threadPoolExecutor.getCompletedTaskCount(),
threadPoolExecutor.getActiveCount(),
threadPoolExecutor.getQueue().size());
}
}
@Override
public void execute(Runnable task) {
showThreadPoolInfo("1. do execute");
super.execute(task);
}
@Override
public void execute(Runnable task, long startTimeout) {
showThreadPoolInfo("2. do execute");
super.execute(task, startTimeout);
}
@Override
public Future<?> submit(Runnable task) {
showThreadPoolInfo("1. do submit");
return super.submit(task);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
showThreadPoolInfo("2. do submit");
return super.submit(task);
}
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
showThreadPoolInfo("1. do submitListenable");
return super.submitListenable(task);
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
showThreadPoolInfo("2. do submitListenable");
return super.submitListenable(task);
}
}
基于此、配置Spring管理下的Configuration以便于自动注入:
@Configuration
public class ThreadPoolConfig {
@Value("${xx.thread.pool.core.size}")
private int corePoolSize;
@Value("${xx.thread.pool.max.size}")
private int maxPoolSize;
@Value("${xx.thread.pool.queue.capacity}")
private int queueCapacity;
@Value("${xx.thread.pool.keep.alive.seconds}")
private int keepAlive;
@Bean(name = "ThreadPool2CronJob")
public JindowinJobThreadPoolTaskExecutor threadPool2CronJob() {
RealJobThreadPoolTaskExecutor executor = new RealJobThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAlive);
executor.setThreadNamePrefix("ThreadPool2CronJob");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
4. 待补充(TODO)
其他
1. 大量的运用了BeanUtils.copyBean,why?
BeanUtils好用而且便捷,将开发者从繁重的get set操作中解放出来。但是要注意:
- 两个拷贝类之间需要拷贝的属性字段名必须要一样,当它们字段值不一样的时候,这时候就需要手动获取并赋值
参考:https://www.jianshu.com/p/357b55852efc
2. SpringBoot结合Mybatis-plus自动回填获取ID
this.save(metaDO);
System.out.println(metaDO.getId());
3. 使用RequestContextHolder获取session会话请求入参来校验登录
POST /sql/template/search?currentUserId=828¤tUserName=%E5%85%B0%E5%85%B0 HTTP/1.1
import org.apache.commons.lang3.StringUtils;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
public class SessionUtils {
private static final String CURRENT_USER_ID = "currentUserId";
private static final String CURRENT_USERNAME = "currentUserName";
public static Long currentUserId() {
HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
String idStr = request.getParameter(CURRENT_USER_ID);
return StringUtils.isBlank(idStr) ? null : Long.valueOf(idStr);
}
public static String currentUsername() {
HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
String idStr = request.getParameter(CURRENT_USERNAME);
return StringUtils.isBlank(idStr) ? null : idStr;
}
public static boolean validUser() {
return currentUserId() != null && currentUsername() != null;
}
}
总结
该平台以flink-sql-submit为基础封装了任务提交的的相关命令,然后结合自身业务情况打造出这样一个平台。其实也有开源的实现如:flink-streaming-platform-we 如果为了快速应用不妨试试,而且目前来看主体的功能很全,适合二次开发。我们当前的平台还处于早期,主要聚焦于最核心功能的实现,其他还在持续完善中。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)