序
本文主要研究一下powerjob的单机线程并发度(threadConcurrency)
threadConcurrency
powerjob-worker/src/main/java/tech/powerjob/worker/pojo/model/InstanceInfo.java
@Data public class InstanceInfo implements Serializable { /** * 基础信息 */ private Long jobId; private Long instanceId; private Long wfInstanceId; /** * 任务执行处理器信息 */ // 任务执行类型,单机、广播、MR private String executeType; // 处理器类型(JavaBean、Jar、脚本等) private String processorType; // 处理器信息 private String processorInfo; // 定时类型 private int timeExpressionType; /** * 超时时间 */ // 整个任务的总体超时时间 private long instanceTimeoutMS; /** * 任务运行参数 */ // 任务级别的参数,相当于类的static变量 private String jobParams; // 实例级别的参数,相当于类的普通变量 private String instanceParams; // 每台机器的处理线程数上限 private int threadConcurrency; // 子任务重试次数(任务本身的重试机制由server控制) private int taskRetryNum; private String logConfig; }
InstanceInfo定义了threadConcurrency,即每台机器的处理线程数上限
maxDispatchNum
powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java
/** * 定时扫描数据库中的task(出于内存占用量考虑,每次最多获取100个),并将需要执行的任务派发出去 */ protected class Dispatcher implements Runnable { // 数据库查询限制,每次最多查询几个任务 private static final int DB_QUERY_LIMIT = 100; @Override public void run() { if (finished.get()) { return; } Stopwatch stopwatch = Stopwatch.createStarted(); // 1. 获取可以派发任务的 ProcessorTracker List<String> availablePtIps = ptStatusHolder.getAvailableProcessorTrackers(); // 2. 没有可用 ProcessorTracker,本次不派发 if (availablePtIps.isEmpty()) { log.debug("[TaskTracker-{}] no available ProcessorTracker now.", instanceId); return; } // 3. 避免大查询,分批派发任务 long currentDispatchNum = 0; long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2L; AtomicInteger index = new AtomicInteger(0); // 4. 循环查询数据库,获取需要派发的任务 while (maxDispatchNum > currentDispatchNum) { int dbQueryLimit = Math.min(DB_QUERY_LIMIT, (int) maxDispatchNum); List<TaskDO> needDispatchTasks = taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.WAITING_DISPATCH, dbQueryLimit); currentDispatchNum += needDispatchTasks.size(); needDispatchTasks.forEach(task -> { // 获取 ProcessorTracker 地址,如果 Task 中自带了 Address,则使用该 Address String ptAddress = task.getAddress(); if (StringUtils.isEmpty(ptAddress) || RemoteConstant.EMPTY_ADDRESS.equals(ptAddress)) { ptAddress = availablePtIps.get(index.getAndIncrement() % availablePtIps.size()); } dispatchTask(task, ptAddress); }); // 数量不足 或 查询失败,则终止循环 if (needDispatchTasks.size() < dbQueryLimit) { break; } } log.debug("[TaskTracker-{}] dispatched {} tasks,using time {}.", instanceId, currentDispatchNum, stopwatch.stop()); } }
这里会计算maxDispatchNum(availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2L),之后通过availablePtIps.get(index.getAndIncrement() % availablePtIps.size())去轮询派发任务
ProcessorTracker
powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java
calThreadPoolSize
private int calThreadPoolSize() { ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType()); ProcessorType processorType = ProcessorType.valueOf(instanceInfo.getProcessorType()); // 脚本类自带线程池,不过为了少一点逻辑判断,还是象征性分配一个线程 if (processorType == ProcessorType.PYTHON || processorType == ProcessorType.SHELL) { return 1; } if (executeType == ExecuteType.MAP_REDUCE || executeType == ExecuteType.MAP) { return instanceInfo.getThreadConcurrency(); } if (TimeExpressionType.FREQUENT_TYPES.contains(instanceInfo.getTimeExpressionType())) { return instanceInfo.getThreadConcurrency(); } return 2; }
ProcessorTracker的calThreadPoolSize方法会根据ProcessorType、ExecuteType、TimeExpressionType来确定线程池大小,比如ProcessorType.PYTHON或者ProcessorType.SHELL返回1,ExecuteType.MAP_REDUCE、ExecuteType.MAP、TimeExpressionType.FREQUENT_TYPES返回的是instanceInfo.greadConcurrency()
initThreadPool
private static final int THREAD_POOL_QUEUE_MAX_SIZE = 128; private void initThreadPool() { int poolSize = calThreadPoolSize(); // 待执行队列,为了防止对内存造成较大压力,内存队列不能太大 BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(THREAD_POOL_QUEUE_MAX_SIZE); // 自定义线程池中线程名称 (PowerJob Processor Pool -> PPP) ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("PPP-%d").build(); // 拒绝策略:直接抛出异常 RejectedExecutionHandler rejectionHandler = new ThreadPoolExecutor.AbortPolicy(); threadPool = new ThreadPoolExecutor(poolSize, poolSize, 60L, TimeUnit.SECONDS, queue, threadFactory, rejectionHandler); // 当没有任务执行时,允许销毁核心线程(即线程池最终存活线程个数可能为0) threadPool.allowCoreThreadTimeOut(true); }
initThreadPool这里创建了ArrayBlockingQueue,大小为128,RejectedExecutionHandler为AbortPolicy,直接抛出异常RejectedExecutionException
submitTask
public void submitTask(TaskDO newTask) { // 一旦 ProcessorTracker 出现异常,所有提交到此处的任务直接返回失败,防止形成死锁 // 死锁分析:TT创建PT,PT创建失败,无法定期汇报心跳,TT长时间未收到PT心跳,认为PT宕机(确实宕机了),无法选择可用的PT再次派发任务,死锁形成,GG斯密达 T_T if (lethal) { ProcessorReportTaskStatusReq report = new ProcessorReportTaskStatusReq() .setInstanceId(instanceId) .setSubInstanceId(newTask.getSubInstanceId()) .setTaskId(newTask.getTaskId()) .setStatus(TaskStatus.WORKER_PROCESS_FAILED.getValue()) .setResult(lethalReason) .setReportTime(System.currentTimeMillis()); TransportUtils.ptReportTask(report, taskTrackerAddress, workerRuntime); return; } boolean success = false; // 1. 设置值并提交执行 newTask.setInstanceId(instanceInfo.getInstanceId()); newTask.setAddress(taskTrackerAddress); HeavyProcessorRunnable heavyProcessorRunnable = new HeavyProcessorRunnable(instanceInfo, taskTrackerAddress, newTask, processorBean, omsLogger, statusReportRetryQueue, workerRuntime); try { threadPool.submit(heavyProcessorRunnable); success = true; } catch (RejectedExecutionException ignore) { log.warn("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed due to ThreadPool has too much task waiting to process, this task will dispatch to other ProcessorTracker.", instanceId, newTask.getTaskId(), newTask.getTaskName()); } catch (Exception e) { log.error("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed.", instanceId, newTask.getTaskId(), newTask.getTaskName(), e); } // 2. 回复接收成功 if (success) { ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq(); reportReq.setInstanceId(instanceId); reportReq.setSubInstanceId(newTask.getSubInstanceId()); reportReq.setTaskId(newTask.getTaskId()); reportReq.setStatus(TaskStatus.WORKER_RECEIVED.getValue()); reportReq.setReportTime(System.currentTimeMillis()); TransportUtils.ptReportTask(reportReq, taskTrackerAddress, workerRuntime); log.debug("[ProcessorTracker-{}] submit task(taskId={}, taskName={}) success, current queue size: {}.", instanceId, newTask.getTaskId(), newTask.getTaskName(), threadPool.getQueue().size()); } }
submitTask这里会根据TaskDO创建HeavyProcessorRunnable,然后提交到threadPool,若有异常则success为false,只有成功了才会创建ProcessorReportTaskStatusReq,回复接收任务成功。若有RejectedExecutionException则会打印warn日志[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed due to ThreadPool has too much task waiting to process, this task will dispatch to other ProcessorTracker.
onReceiveProcessorReportTaskStatusReq
powerjob-worker/src/main/java/tech/powerjob/worker/actors/TaskTrackerActor.java
@Handler(path = WTT_HANDLER_REPORT_TASK_STATUS) public AskResponse onReceiveProcessorReportTaskStatusReq(ProcessorReportTaskStatusReq req) { int taskStatus = req.getStatus(); // 只有重量级任务才会有两级任务状态上报的机制 HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.getTaskTracker(req.getInstanceId()); // 手动停止 TaskTracker 的情况下会出现这种情况 if (taskTracker == null) { log.warn("[TaskTrackerActor] receive ProcessorReportTaskStatusReq({}) but system can't find TaskTracker.", req); return null; } if (ProcessorReportTaskStatusReq.BROADCAST.equals(req.getCmd())) { taskTracker.broadcast(taskStatus == TaskStatus.WORKER_PROCESS_SUCCESS.getValue(), req.getSubInstanceId(), req.getTaskId(), req.getResult()); } taskTracker.updateTaskStatus(req.getSubInstanceId(), req.getTaskId(), taskStatus, req.getReportTime(), req.getResult()); // 更新工作流上下文 taskTracker.updateAppendedWfContext(req.getAppendedWfContext()); // 结束状态需要回复接受成功 if (TaskStatus.FINISHED_STATUS.contains(taskStatus)) { return AskResponse.succeed(null); } return null; }
TaskTrackerActor接收到ProcessorReportTaskStatusReq,会通过updateTaskStatus更新状态,如果是FINISHED_STATUS状态则回复接收成功AskResponse.succeed(null)
TaskStatus
powerjob-worker/src/main/java/tech/powerjob/worker/common/constants/TaskStatus.java
@Getter @AllArgsConstructor public enum TaskStatus { WAITING_DISPATCH(1, "等待调度器调度"), DISPATCH_SUCCESS_WORKER_UNCHECK(2, "调度成功(但不保证worker收到)"), WORKER_RECEIVED(3, "worker接收成功,但未开始执行"), WORKER_PROCESSING(4, "worker正在执行"), WORKER_PROCESS_FAILED(5, "worker执行失败"), WORKER_PROCESS_SUCCESS(6, "worker执行成功"); public static final Set<Integer> FINISHED_STATUS = Sets.newHashSet(WORKER_PROCESS_FAILED.value, WORKER_PROCESS_SUCCESS.value); private final int value; private final String des; public static TaskStatus of(int v) { for (TaskStatus taskStatus : values()) { if (v == taskStatus.value) { return taskStatus; } } throw new IllegalArgumentException("no TaskStatus match the value of " + v); } }
task_info表中的status一共有等待调度WAITING_DISPATCH、调度DISPATCH_SUCCESS_WORKER_UNCHECK、worker接收成功WORKER_RECEIVED、worker处理中WORKER_PROCESSING、worker处理失败WORKER_PROCESS_FAILED、worker处理成功WORKER_PROCESS_SUCCESS这几个状态,其中处理成功和处理失败为完结状态
HeavyProcessorRunnable
powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/runnable/HeavyProcessorRunnable.java
public void run() { // 切换线程上下文类加载器(否则用的是 Worker 类加载器,不存在容器类,在序列化/反序列化时会报 ClassNotFoundException) Thread.currentThread().setContextClassLoader(processorBean.getClassLoader()); try { innerRun(); } catch (InterruptedException ignore) { // ignore } catch (Throwable e) { reportStatus(TaskStatus.WORKER_PROCESS_FAILED, e.toString(), null, null); log.error("[ProcessorRunnable-{}] execute failed, please contact the author(@KFCFans) to fix the bug!", task.getInstanceId(), e); } finally { ThreadLocalStore.clear(); } } public void innerRun() throws InterruptedException { final BasicProcessor processor = processorBean.getProcessor(); String taskId = task.getTaskId(); Long instanceId = task.getInstanceId(); log.debug("[ProcessorRunnable-{}] start to run task(taskId={}&taskName={})", instanceId, taskId, task.getTaskName()); ThreadLocalStore.setTask(task); ThreadLocalStore.setRuntimeMeta(workerRuntime); // 0. 构造任务上下文 WorkflowContext workflowContext = constructWorkflowContext(); TaskContext taskContext = constructTaskContext(); taskContext.setWorkflowContext(workflowContext); // 1. 上报执行信息 reportStatus(TaskStatus.WORKER_PROCESSING, null, null, null); ProcessResult processResult; ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType()); // 2. 根任务 & 广播执行 特殊处理 if (TaskConstant.ROOT_TASK_NAME.equals(task.getTaskName()) && executeType == ExecuteType.BROADCAST) { // 广播执行:先选本机执行 preProcess,完成后 TaskTracker 再为所有 Worker 生成子 Task handleBroadcastRootTask(instanceId, taskContext); return; } // 3. 最终任务特殊处理(一定和 TaskTracker 处于相同的机器) if (TaskConstant.LAST_TASK_NAME.equals(task.getTaskName())) { handleLastTask(taskId, instanceId, taskContext, executeType); return; } // 4. 正式提交运行 try { processResult = processor.process(taskContext); if (processResult == null) { processResult = new ProcessResult(false, "ProcessResult can't be null"); } } catch (Throwable e) { log.warn("[ProcessorRunnable-{}] task(id={},name={}) process failed.", instanceId, taskContext.getTaskId(), taskContext.getTaskName(), e); processResult = new ProcessResult(false, e.toString()); } reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), null, workflowContext.getAppendedContextData()); }
HeavyProcessorRunnable的run方法委派给了innerRun,它捕获Throwable异常然后上报为WORKER_PROCESS_FAILED状态;innerRun方法在被执行时,先上报状态为WORKER_PROCESSING,之后回调processor.process进行处理,若处理成功则上报WORKER_PROCESS_SUCCESS,否则上报WORKER_PROCESS_FAILED
小结
powerjob的InstanceInfo定义了threadConcurrency,即每台机器的处理线程数上限
- HeavyTaskTracker会计算maxDispatchNum(availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2L),之后通过availablePtIps.get(index.getAndIncrement() % availablePtIps.size())去轮询派发任务
- ProcessorTracker的calThreadPoolSize方法会根据ProcessorType、ExecuteType、TimeExpressionType来确定线程池大小,比如ProcessorType.PYTHON或者ProcessorType.SHELL返回1,ExecuteType.MAP_REDUCE、ExecuteType.MAP、TimeExpressionType.FREQUENT_TYPES返回的是instanceInfo.greadConcurrency();initThreadPool这里创建了ArrayBlockingQueue,大小为128,RejectedExecutionHandler为AbortPolicy,直接抛出异常RejectedExecutionException;submitTask这里会根据TaskDO创建HeavyProcessorRunnable,然后提交到threadPool,若有异常则success为false,只有成功了才会创建ProcessorReportTaskStatusReq,回复接收任务成功
- TaskTrackerActor接收到ProcessorReportTaskStatusReq,会通过updateTaskStatus更新状态,如果是FINISHED_STATUS状态则回复接收成功AskResponse.succeed(null)
- HeavyProcessorRunnable的run方法委派给了innerRun,它捕获Throwable异常然后上报为WORKER_PROCESS_FAILED状态;innerRun方法在被执行时,先上报状态为WORKER_PROCESSING,之后回调processor.process进行处理,若处理成功则上报WORKER_PROCESS_SUCCESS,否则上报WORKER_PROCESS_FAILED
原文地址:https://mp.weixin.qq.com/s/JE4suP7GZQZ_uQ_64rToxQ