- import java.io.BufferedOutputStream;
- import java.io.File;
- import java.io.FileOutputStream;
- import java.io.IOException;
- import java.io.InputStream;
- import java.io.OutputStream;
- import java.net.HttpURLConnection;
- import java.net.URL;
- import java.util.ArrayList;
- import java.util.Calendar;
- import java.util.Date;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import org.apache.commons.collections4.CollectionUtils;
- import org.apache.commons.io.IOUtils;
- import org.apache.commons.lang3.StringUtils;
- import org.joda.time.DateTime;
- import org.joda.time.format.DateTimeFormat;
- import org.joda.time.format.DateTimeFormatter;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.core.io.FileSystemResource;
- import org.springframework.http.HttpEntity;
- import org.springframework.http.HttpHeaders;
- import org.springframework.http.HttpMethod;
- import org.springframework.http.ResponseEntity;
- import org.springframework.stereotype.Service;
- import org.springframework.util.LinkedMultiValueMap;
- import org.springframework.util.MultiValueMap;
- import org.springframework.web.client.RestTemplate;
- import com.google.common.collect.Lists;
- import com.google.gson.Gson;
- import com.google.gson.JsonElement;
- import com.google.gson.JsonObject;
- import org.apache.http.HttpStatus;
- /**
- * azkaban接口
- * @author hao
- *
- */
- @Service
- public class AzkabanService {
-
- private static final Logger logger = LoggerFactory.getLogger(AzkabanService.class);
- private static final String CONTENT_TYPE = "application/x-www-form-urlencoded; charset=utf-8";
- private static final String X_REQUESTED_WITH = "XMLHttpRequest";
- private static final DateTimeFormatter formatterTime = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
-
- @Autowired
- private RestTemplate restTemplate;
-
- @Autowired
- private AzkabanConfig azkabanConfig;
-
- /**
- * Azkaban登录接口,返回sessionId
- * @return
- * @throws Exception
- */
- public String login() throws Exception {
- SSLUtil.turnOffSslChecking();
- HttpHeaders hs = new HttpHeaders();
- hs.add("Content-Type", CONTENT_TYPE);
- hs.add("X-Requested-With", X_REQUESTED_WITH);
- LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();
- linkedMultiValueMap.add("action", "login");
- linkedMultiValueMap.add("username", azkabanConfig.getAzkUsername());
- linkedMultiValueMap.add("password", azkabanConfig.getAzkPassword());
- HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
- String result = restTemplate.postForObject(azkabanConfig.getAzkUrl(), httpEntity, String.class);
- if (!SysContants.AZK_SUCCESS.equals(new Gson().fromJson(result, JsonObject.class).get("status").getAsString())) {
- logger.error("Azkaban登录失败!返回错误信息:"+result);
- throw new Exception("Azkaban登录失败!");
- }
- return new Gson().fromJson(result, JsonObject.class).get("session.id").getAsString();
- }
- /**
- * Azkaban上传zip文件
- * @param projectName 项目名称
- * @param file 文件
- * @return projectId编号
- * @throws Exception
- */
- public String uploadZip(String projectName, File file) throws Exception {
- SSLUtil.turnOffSslChecking();
- FileSystemResource resource = new FileSystemResource(file);
- LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();
- linkedMultiValueMap.add("session.id", login());
- linkedMultiValueMap.add("ajax", "upload");
- linkedMultiValueMap.add("project", projectName);
- linkedMultiValueMap.add("file", resource);
- String result = restTemplate.postForObject(azkabanConfig.getAzkUrl() + "/manager", linkedMultiValueMap,
- String.class);
- if (StringUtils.isEmpty(new Gson().fromJson(result, JsonObject.class).get("projectId").getAsString())) {
- logger.error("上传文件至Azkaban失败:",projectName,file.getPath());
- logger.error("Azkaban上传文件失败!返回错误信息:"+result);
- return null;
- }
- return new Gson().fromJson(result, JsonObject.class).get("projectId").getAsString();
- }
-
- /**
- * Azkaban创建project
- * @param projectName,project名称
- * @param description,project描述
- * @return 是否成功
- * @throws Exception
- */
- public boolean createProject(String projectName, String description) throws Exception {
- SSLUtil.turnOffSslChecking();
- HttpHeaders hs = new HttpHeaders();
- hs.add("Content-Type", CONTENT_TYPE);
- hs.add("X-Requested-With", X_REQUESTED_WITH);
- LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();
- linkedMultiValueMap.add("session.id", login());
- linkedMultiValueMap.add("action", "create");
- linkedMultiValueMap.add("name", projectName);
- linkedMultiValueMap.add("description", description);
- HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
- logger.info("Azkaban请求信息:" + httpEntity.toString());
- String result = restTemplate.postForObject(azkabanConfig.getAzkUrl() + "/manager", httpEntity, String.class);
- logger.info("Azkaban返回创建Project信息:" + result);
- // 创建成功和已存在,都表示创建成功
- if (!SysContants.AZK_SUCCESS
- .equals(new Gson().fromJson(result, JsonObject.class).get("status").getAsString())) {
- if (!"Project already exists."
- .equals(new Gson().fromJson(result, JsonObject.class).get("message").getAsString())) {
- logger.error("创建Azkaban Project失败:",projectName);
- logger.error("创建Azkaban Project失败!返回错误信息:"+result);
- return false;
- }
- }
- return true;
- }
-
- /**
- * Azkaban删除project
- * @param projectName 项目名称
- * @throws Exception
- */
- public void deleteProject(String projectName) throws Exception {
- SSLUtil.turnOffSslChecking();
- HttpHeaders hs = new HttpHeaders();
- hs.add("Content-Type", CONTENT_TYPE);
- hs.add("X-Requested-With", X_REQUESTED_WITH);
- hs.add("Accept", "text/plain;charset=utf-8");
- Map<String, String> map = new HashMap<>();
- map.put("id", login());
- map.put("project", projectName);
- ResponseEntity<String> exchange = restTemplate.exchange(
- azkabanConfig.getAzkUrl() + "/manager?session.id={id}&delete=true&project={project}", HttpMethod.GET,
- new HttpEntity<String>(hs), String.class, map);
- if (HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
- logger.error("删除Azkaban Project失败!返回错误信息:"+exchange);
- throw new Exception("删除Azkaban Project失败");
- }
- }
-
- /**
- * 获取一个项目的所有流flows
- * @param projectName 项目名称
- * @return List<String> 项目的所有流
- * @throws Exception
- */
- public List<String> fetchFlowsProject(String projectName) throws Exception {
- SSLUtil.turnOffSslChecking();
- HttpHeaders hs = new HttpHeaders();
- hs.add("Content-Type", CONTENT_TYPE);
- hs.add("X-Requested-With", X_REQUESTED_WITH);
- hs.add("Accept", "text/plain;charset=utf-8");
- Map<String, String> map = new HashMap<>();
- map.put("id", login());
- map.put("project", projectName);
- ResponseEntity<String> exchange = restTemplate.exchange(
- azkabanConfig.getAzkUrl() + "/manager?&session.id={id}&project={project}&ajax=fetchprojectflows", HttpMethod.GET,
- new HttpEntity<String>(hs), String.class, map);
- if (exchange == null||HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
- logger.error("Azkaban获取一个项目的所有流信息失败:" + projectName);
- logger.error("Azkaban获取一个项目的所有流信息失败:!返回错误信息:"+exchange);
- return null;
- }
- JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("flows");
- if (obj == null) {
- logger.error("Azkaban获取一个项目的所有流信息失败:{}:{}", projectName);
- return null;
- }
- List<String> flows = new ArrayList<String>();
- for(JsonElement jobj:obj.getAsJsonArray()) {
- flows.add(jobj.getAsJsonObject().get("flowId").getAsString());
- }
- return flows;
- }
-
- /**
- * 获取一个流的所有作业
- * @param projectName 项目名
- * @param flowId 流id
- * @return
- * @throws Exception
- */
- public String fetchJobsFlow(String projectName, String flowId) throws Exception {
- SSLUtil.turnOffSslChecking();
- HttpHeaders hs = new HttpHeaders();
- hs.add("Content-Type", CONTENT_TYPE);
- hs.add("X-Requested-With", X_REQUESTED_WITH);
- hs.add("Accept", "text/plain;charset=utf-8");
- Map<String, String> map = new HashMap<>();
- map.put("id", login());
- map.put("project", projectName);
- map.put("flow", flowId);
- ResponseEntity<String> exchange = restTemplate.exchange(
- azkabanConfig.getAzkUrl() + "/manager?&session.id={id}&project={project}&flow={flow}&ajax=fetchflowgraph", HttpMethod.GET,
- new HttpEntity<String>(hs), String.class, map);
- if (exchange == null) {
- logger.error("Azkaban获取一个项目的所有流信息失败:" + projectName);
- return null;
- }
- logger.debug("Azkaban获取一个项目的所有流信息:" + exchange);
- if (HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
- throw new Exception("Azkaban获取一个项目的所有流信息失败");
- }
- return exchange.toString();
- }
-
- /**
- * Flow 获取执行的project 列表
- * azkaban api 获取流的执行
- * @param projectName 项目名
- * @param flowId 流id
- * @param start
- * @param length
- * @return
- * @throws Exception
- */
- public FlowExecution fetchFlowExecutions(String projectName, String flowId, String start,String length) throws Exception {
- SSLUtil.turnOffSslChecking();
- HttpHeaders hs = new HttpHeaders();
- hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
- hs.add("X-Requested-With", "XMLHttpRequest");
- hs.add("Accept", "text/plain;charset=utf-8");
- Map<String, String> map = new HashMap<>();
- map.put("id", login());
- map.put("project", projectName);
- map.put("flow", flowId);
- map.put("start", String.valueOf(start));
- map.put("length", String.valueOf(length));
- ResponseEntity<String> exchange = restTemplate.exchange(
- azkabanConfig.getAzkUrl() + "/manager?session.id={id}&ajax=fetchFlowExecutions&project={project}&flow={flow}&start={start}&length={length}",
- HttpMethod.GET, new HttpEntity<String>(hs), String.class, map);
- if (exchange == null || HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
- logger.error("Azkaban获取一个项目运行记录信息失败:{}:{}", projectName, flowId);
- return null;
- }
- return new Gson().fromJson(exchange.getBody(), FlowExecution.class);
- }
-
- /**
- * Flow 获取正在执行的流id
- * @param projectName
- * @param flowId
- * @return
- * @throws Exception
- */
- public String getRunning(String projectName, String flowId) throws Exception {
- SSLUtil.turnOffSslChecking();
- HttpHeaders hs = new HttpHeaders();
- hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
- hs.add("X-Requested-With", "XMLHttpRequest");
- hs.add("Accept", "text/plain;charset=utf-8");
- Map<String, String> map = new HashMap<>();
- map.put("id", login());
- map.put("project", projectName);
- map.put("flow", flowId);
- ResponseEntity<String> exchange = restTemplate.exchange(
- azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=getRunning&project={project}&flow={flow}", HttpMethod.GET,
- new HttpEntity<String>(hs), String.class, map);
- return exchange.getBody();
- }
-
- /**
- * Execute a Flow 执行一个流 还有很多其他参数 具体参考azkabanConfig.getAzkUrl()
- *
- * @throws KeyManagementException
- * @throws NoSuchAlgorithmException
- */
- public String executeFlow(String projectName, String flowId) throws Exception {
- SSLUtil.turnOffSslChecking();
- HttpHeaders hs = new HttpHeaders();
- hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
- hs.add("X-Requested-With", "XMLHttpRequest");
- hs.add("Accept", "text/plain;charset=utf-8");
- Map<String, String> map = new HashMap<>();
- map.put("id", login());
- map.put("project", projectName);
- map.put("flow", flowId);
- ResponseEntity<String> exchange = restTemplate.exchange(
- azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=executeFlow&project={project}&flow={flow}", HttpMethod.GET,
- new HttpEntity<String>(hs), String.class, map);
- if (exchange == null||HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
- logger.error("执行一个流请求失败:{}:{}", projectName, flowId);
- return null;
- }
- JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("execid");
- if (obj == null) {
- logger.error("执行一个流失败:{}:{}", projectName, flowId);
- return null;
- }
- return obj.getAsString();
- }
-
- /**
- * Cancel a Flow Execution 取消流程执行
- * azkaban api 取消流程执行
- * @throws KeyManagementException
- * @throws NoSuchAlgorithmException
- */
- public void cancelEXEaFlow(String projectName,String start,String size) throws Exception {
- int flag=0;
- List<String> flows = fetchFlowsProject(projectName);//获取所有流
- for (String flow : flows) {
- FlowExecution fe = fetchFlowExecutions(projectName, flow, start, size);
- if (fe == null) {
- continue;
- }
- List<Execution> executions = fe.getExecutions();//获取执行id
- for (Execution execution : executions) {
- if(null!=execution&&null!=execution.getExecId()&&"RUNNING".equals(execution.getStatus())){//运行中的
- SSLUtil.turnOffSslChecking();
- HttpHeaders hs = new HttpHeaders();
- hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
- hs.add("X-Requested-With", "XMLHttpRequest");
- hs.add("Accept", "text/plain;charset=utf-8");
- Map<String, String> map = new HashMap<>();
- map.put("id", login());
- map.put("execid", String.valueOf(execution.getExecId()));
- ResponseEntity<String> exchange = restTemplate.exchange(
- azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=cancelFlow&execid={execid}", HttpMethod.GET,
- new HttpEntity<String>(hs), String.class, map);
- System.out.println(exchange.getBody());
- if (exchange == null||HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
- logger.error("取消执行调度失败,请求azkaban接口异常:"+exchange);
- throw new Exception("取消执行调度失败,请求azkaban接口异常:"+exchange);
- }
- JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("error");
- if (obj != null) {
- throw new Exception("取消执行调度失败,请刷新列表获取最新调度状态!");
- }
- flag++;
- }
- }
- }
- if(0==flag){
- throw new Exception("该调度不是运行中状态,请刷新列表获取最新状态!");
- }
- }
-
- /**
- * 根据时间 创建调度任务
- *
- * @param projectId
- * @param projectName
- * @param flow
- * @param flowName
- * @param recurring,是否循环,on循环
- * @param period,循环频率:M:Months,w:Weeks,d:Days,h:Hours,m:Minutes,s:Seconds;如60s,支持分钟的倍数
- * @param date,开始时间
- * @return 返回scheduleId
- * @throws Exception
- */
- public String scheduleEXEaFlow(String projectId, String projectName, String flow, String flowName, String recurring,
- String period, Date date) throws Exception {
- SSLUtil.turnOffSslChecking();
- HttpHeaders hs = new HttpHeaders();
- hs.add("Content-Type", CONTENT_TYPE);
- hs.add("X-Requested-With", X_REQUESTED_WITH);
- LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();
- linkedMultiValueMap.add("session.id", login());
- linkedMultiValueMap.add("ajax", "scheduleFlow");
- linkedMultiValueMap.add("projectName", projectName);
- linkedMultiValueMap.add("projectId", projectId);
- linkedMultiValueMap.add("flow", flow);
- linkedMultiValueMap.add("flowName", flowName);
- linkedMultiValueMap.add("is_recurring", recurring);
- linkedMultiValueMap.add("period", period);
- scheduleTimeInit(linkedMultiValueMap, date);
- HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
- String result = restTemplate.postForObject(azkabanConfig.getAzkUrl() + "/schedule", httpEntity, String.class);
- logger.info("Azkaban返回根据时间创建定时任务信息:" + result);
- if (!SysContants.AZK_SUCCESS.equals(new Gson().fromJson(result, JsonObject.class).get("status").getAsString())
- || new Gson().fromJson(result, JsonObject.class).get("scheduleId").getAsInt() < 0) {
- logger.error("Azkaban返回根据时间创建定时任务信息失败:!返回错误信息:"+result);
- throw new Exception("根据时间创建定时任务失败");
- }
- return new Gson().fromJson(result, JsonObject.class).get("scheduleId").getAsString();
- }
- /**
- * 根据cron表达式 创建调度任务
- * @param projectName 项目名称
- * @param cron cron表达式
- * @param flow 流
- * @param flowName 流名称
- * @return 返回scheduleId
- * @throws Exception
- */
- public String scheduleByCronEXEaFlow(String projectName, String cron, String flowName)
- throws Exception {
- SSLUtil.turnOffSslChecking();
- HttpHeaders hs = new HttpHeaders();
- hs.add("Content-Type", CONTENT_TYPE);
- hs.add("X-Requested-With", X_REQUESTED_WITH);
- LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();
- linkedMultiValueMap.add("session.id", login());
- linkedMultiValueMap.add("ajax", "scheduleCronFlow");
- linkedMultiValueMap.add("projectName", projectName);
- linkedMultiValueMap.add("cronExpression", cron);
- linkedMultiValueMap.add("flow", flowName);
- HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
- String result = restTemplate.postForObject(azkabanConfig.getAzkUrl() + "/schedule", httpEntity, String.class);
- if (!SysContants.AZK_SUCCESS
- .equals(new Gson().fromJson(result, JsonObject.class).get("status").getAsString())) {
- logger.error("Azkaban返回根据时间创建定时任务信息失败:!返回错误信息:"+result);
- return null;
- }
- return new Gson().fromJson(result, JsonObject.class).get("scheduleId").getAsString();
- }
- /**
- * 根据scheduleId取消一个流的调度
- * 暂停一次执行,输入为exec id。如果这个执行不是处于running状态,会返回错误信息。
- * @param scheduleId
- * @throws Exception
- */
- public boolean unscheduleFlow(String scheduleId) {
- try {
- SSLUtil.turnOffSslChecking();
- HttpHeaders hs = new HttpHeaders();
- hs.add("Content-Type", CONTENT_TYPE);
- hs.add("X-Requested-With", X_REQUESTED_WITH);
- LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();
- linkedMultiValueMap.add("session.id", login());
- linkedMultiValueMap.add("action", "removeSched");
- linkedMultiValueMap.add("scheduleId", scheduleId);
- HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
- String result = restTemplate.postForObject(azkabanConfig.getAzkUrl() + "/schedule", httpEntity,
- String.class);
- if (StringUtils.isBlank(result)) {
- return false;
- }
- if (!SysContants.AZK_SUCCESS
- .equals(String.valueOf(new Gson().fromJson(result, JsonObject.class).get("status")))) {
- logger.error("取消流调度信息失败:{}", scheduleId);
- logger.error("Azkaban取消流调度信息失败失败:!返回错误信息:"+result);
- return false;
- }
- } catch (Exception e) {
- logger.error("取消流调度信息失败:{}", scheduleId);
- return false;
- }
- return true;
- }
- /**
- * 下载Azkaban压缩文件
- * @param projectName 项目名称
- * @param zipPath 文件路径
- * @throws Exception 文件异常
- */
- public void downLoadZip(String projectName, String zipPath) throws Exception {
- OutputStream output = null;
- BufferedOutputStream bufferedOutput = null;
- try {
- URL url = new URL(azkabanConfig.getAzkUrl() + "/manager?session.id=" + login() + "&download=true&project="
- + projectName);
- HttpURLConnection conn = (HttpURLConnection) url.openConnection();
- conn.setConnectTimeout(3 * 1000);
- InputStream inputStream = conn.getInputStream();
- File file = new File(zipPath);
- output = new FileOutputStream(file);
- bufferedOutput = new BufferedOutputStream(output);
- bufferedOutput.write(IOUtils.toByteArray(inputStream));
- } catch (Exception e) {
- logger.info("下载Azkaban压缩文件异常:" + e.getMessage(), e);
- } finally {
- if (bufferedOutput != null) {
- try {
- bufferedOutput.flush();
- bufferedOutput.close();
- } catch (IOException e) {
- logger.info("关闭流异常:" + e.getMessage(), e);
- }
- }
- if (output != null) {
- try {
- output.close();
- } catch (IOException e) {
- logger.info("关闭流异常:" + e.getMessage(), e);
- }
- }
- }
- }
- /**
- * 获取一个调度器job的信息 根据project的id 和 flowId
- * @param projectId 项目名称
- * @param flowId 流编号
- * @return job的信息
- */
- public String fetchSchedule(String projectId, String flowId) {
- try {
- SSLUtil.turnOffSslChecking();
- HttpHeaders hs = new HttpHeaders();
- hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
- hs.add("X-Requested-With", "XMLHttpRequest");
- hs.add("Accept", "text/plain;charset=utf-8");
- Map<String, String> map = new HashMap<>();
- map.put("id", login());
- map.put("projectId", projectId);
- map.put("flowId", flowId);
- ResponseEntity<String> exchange = restTemplate.exchange(
- azkabanConfig.getAzkUrl()
- + "/schedule?session.id={id}&ajax=fetchSchedule&projectId={projectId}&flowId={flowId}",
- HttpMethod.GET, new HttpEntity<String>(hs), String.class, map);
- if (HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
- logger.error("获取一个调度器job的信息失败:{}:{}", projectId, flowId);
- return null;
- }
- JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("schedule");
- if (obj == null) {
- logger.error("获取一个调度器job的信息失败:{}:{}", projectId, flowId);
- return null;
- }
- return obj.getAsJsonObject().get("scheduleId").getAsString();
- } catch (Exception e) {
- logger.error("获取一个调度器job的信息失败:{}:{}", projectId, flowId);
- }
- return null;
- }
- /**
- * SLA 设置调度任务 执行的时候 或者执行成功失败等等的规则匹配 发邮件或者...
- * @return
- * @throws Exception
- */
- public String setSla() throws Exception {
- SSLUtil.turnOffSslChecking();
- HttpHeaders hs = new HttpHeaders();
- hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
- hs.add("X-Requested-With", "XMLHttpRequest");
- LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();
- linkedMultiValueMap.add("session.id", "ffad7355-4427-4770-9c14-3d19736fa73a");
- linkedMultiValueMap.add("ajax", "setSla");
- linkedMultiValueMap.add("scheduleId", "6");
- linkedMultiValueMap.add("slaEmails", "771177@qq.com");
- linkedMultiValueMap.add("settings[0]", "begin,SUCCESS,5:00,true,false");
- linkedMultiValueMap.add("settings[1]", "exe,SUCCESS,5:00,true,false");
- linkedMultiValueMap.add("settings[2]", "end,SUCCESS,5:00,true,false");
- // linkedMultiValueMap.add("settings[3]",
- // "xxx,SUCCESS,5:00,true,false");
- HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
- String postForObject = restTemplate.postForObject(azkabanConfig.getAzkUrl() + "/schedule", httpEntity, String.class);
- return postForObject;
- }
- /**
- * SLA 获取调度的规则配置
- * @throws Exception
- */
- public void slaInfo() throws Exception {
- SSLUtil.turnOffSslChecking();
- HttpHeaders hs = new HttpHeaders();
- hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
- hs.add("X-Requested-With", "XMLHttpRequest");
- hs.add("Accept", "text/plain;charset=utf-8");
- Map<String, String> map = new HashMap<>();
- map.put("id", "c4adf192-dcf4-4e05-bd08-f6989dc544a7");
- map.put("scheduleId", "6");
- ResponseEntity<String> exchange = restTemplate.exchange(
- azkabanConfig.getAzkUrl() + "/schedule?session.id={id}&ajax=slaInfo&scheduleId={scheduleId}", HttpMethod.GET,
- new HttpEntity<String>(hs), String.class, map);
- System.out.println(exchange.getBody());
- }
- /**
- * Execution 暂停一个执行流
- * @throws Exception
- */
- public void pauseFlow() throws Exception {
- SSLUtil.turnOffSslChecking();
- HttpHeaders hs = new HttpHeaders();
- hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
- hs.add("X-Requested-With", "XMLHttpRequest");
- hs.add("Accept", "text/plain;charset=utf-8");
- Map<String, String> map = new HashMap<>();
- map.put("id", "c4adf192-dcf4-4e05-bd08-f6989dc544a7");
- map.put("execid", "12");
- ResponseEntity<String> exchange = restTemplate.exchange(
- azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=pauseFlow&execid={execid}", HttpMethod.GET,
- new HttpEntity<String>(hs), String.class, map);
- System.out.println(exchange.getBody());
- }
- /**
- * Flow Execution 重新执行一个执行流
- * @throws Exception
- */
- public void resumeFlow() throws Exception {
- SSLUtil.turnOffSslChecking();
- HttpHeaders hs = new HttpHeaders();
- hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
- hs.add("X-Requested-With", "XMLHttpRequest");
- hs.add("Accept", "text/plain;charset=utf-8");
- Map<String, String> map = new HashMap<>();
- map.put("id", "c4adf192-dcf4-4e05-bd08-f6989dc544a7");
- map.put("execid", "11");
- ResponseEntity<String> exchange = restTemplate.exchange(
- azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=resumeFlow&execid={execid}", HttpMethod.GET,
- new HttpEntity<String>(hs), String.class, map);
- System.out.println(exchange.getBody());
- }
- /**
- * 获取一个执行流的详细信息 这个流的每个节点的信息 成功或者失败等等
- * @param execid 执行id
- * @return
- * @throws Exception
- */
- public String fetchexecflow(String execid) throws Exception {
- SSLUtil.turnOffSslChecking();
- HttpHeaders hs = new HttpHeaders();
- hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
- hs.add("X-Requested-With", "XMLHttpRequest");
- hs.add("Accept", "text/plain;charset=utf-8");
- Map<String, String> map = new HashMap<>();
- map.put("id", login());
- map.put("execid", execid);
- ResponseEntity<String> exchange = restTemplate.exchange(
- azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=fetchexecflow&execid={execid}", HttpMethod.GET,
- new HttpEntity<String>(hs), String.class, map);
- if (exchange == null || HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
- logger.error("获取一个执行流的详细信息失败:" + execid);
- return null;
- }
- return exchange.getBody();
- }
- /**
- * 获取一个执行流的日志
- * @param execid 执行编号
- * @param jobId job编号
- * @param offset
- * @param length
- * @return
- * @throws Exception
- */
- public String fetchExecJobLogs(String execid,String jobId,String offset,String length) throws Exception {
- SSLUtil.turnOffSslChecking();
- HttpHeaders hs = new HttpHeaders();
- hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
- hs.add("X-Requested-With", "XMLHttpRequest");
- hs.add("Accept", "text/plain;charset=utf-8");
- Map<String, String> map = new HashMap<>();
- map.put("id", login());
- map.put("execid", execid);
- map.put("jobId", jobId);
- map.put("offset", offset);
- map.put("length", length);
- ResponseEntity<String> exchange = restTemplate.exchange(
- azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=fetchExecJobLogs&execid={execid}&jobId={jobId}&offset={offset}&length={length}",
- HttpMethod.GET, new HttpEntity<String>(hs), String.class, map);
- if (exchange == null || HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
- logger.error("获取一个执行流的详细信息失败:{}:{}", execid,jobId);
- return null;
- }
-
- JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("data");
- if (obj == null) {
- logger.error("获取一个执行流的详细信息为空:{}:{}", execid,jobId);
- return null;
- }
- return obj.getAsString();
- }
-
- /**
- * 获取一个执行流的日志概要
- * @param execid
- * @param offset
- * @param length
- * @return
- * @throws Exception
- */
- public String fetchExecFlowLogs(String execid,String offset,String length) throws Exception {
- SSLUtil.turnOffSslChecking();
- HttpHeaders hs = new HttpHeaders();
- hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
- hs.add("X-Requested-With", "XMLHttpRequest");
- hs.add("Accept", "text/plain;charset=utf-8");
- Map<String, String> map = new HashMap<>();
- map.put("id", login());
- map.put("execid", execid);
- map.put("offset", offset);
- map.put("length", length);
- ResponseEntity<String> exchange = restTemplate.exchange(
- azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=fetchExecFlowLogs&execid={execid}&offset={offset}&length={length}",
- HttpMethod.GET, new HttpEntity<String>(hs), String.class, map);
- if (exchange == null || HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
- logger.error("获取一个执行流的日志概要信息失败:{}", execid);
- return null;
- }
-
- JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("data");
- if (obj == null) {
- logger.error("获取一个执行流的日志概要信息为空:{}:{}", execid);
- return null;
- }
- return obj.getAsString();
- }
-
- /**
- * 获取执行流的信息状态
- * @throws Exception
- */
- public void fetchexecflowupdate() throws Exception {
- SSLUtil.turnOffSslChecking();
- HttpHeaders hs = new HttpHeaders();
- hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
- hs.add("X-Requested-With", "XMLHttpRequest");
- hs.add("Accept", "text/plain;charset=utf-8");
- Map<String, String> map = new HashMap<>();
- map.put("id", "c4adf192-dcf4-4e05-bd08-f6989dc544a7");
- map.put("execid", "11");
- map.put("lastUpdateTime", "-1");
- ResponseEntity<String> exchange = restTemplate.exchange(
- azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=fetchexecflowupdate&execid={execid}&lastUpdateTime={lastUpdateTime}",
- HttpMethod.GET, new HttpEntity<String>(hs), String.class, map);
- System.out.println(exchange.getBody());
- }
-
- private void scheduleTimeInit(LinkedMultiValueMap<String, String> linkedMultiValueMap, Date date) {
- Calendar calendar = Calendar.getInstance();
- calendar.setTime(date);
- Integer year = calendar.get(Calendar.YEAR);
- Integer month = calendar.get(Calendar.MONTH) + 1;
- Integer day = calendar.get(Calendar.DATE);
- Integer hour = calendar.get(Calendar.HOUR_OF_DAY);
- Integer minute = calendar.get(Calendar.MINUTE);
- linkedMultiValueMap.add("scheduleTime", hour + "," + minute + (hour > 11 ? ",pm,PDT" : ",am,EDT"));
- linkedMultiValueMap.add("scheduleDate", month + "/" + day + "/" + year);
- }
- /**
- * 获取azkaban调度 调度概况 (分页查询)
- * @param projectName 工程名称
- * @param start 分页参数
- * @param size 分页参数
- * @return
- * @throws Exception
- * @throws Exception
- */
- public List<GovernTaskRecordBean> getAzkabanExcutions(String projectName,String start,String size) throws Exception {
- List<GovernTaskRecordBean> excInfoList = Lists.newArrayList();
- List<String> flows = fetchFlowsProject(projectName);
- for (String flow : flows) {
- FlowExecution fe = fetchFlowExecutions(projectName, flow, start, size);
- if (fe == null) {
- continue;
- }
- List<Execution> executions = fe.getExecutions();
- for (Execution execution : executions) {
- GovernTaskRecordBean eInfo = new GovernTaskRecordBean();
- eInfo.setCreateTime(new DateTime(execution.getSubmitTime()).toString(formatterTime));
- if (execution.getEndTime() > 0) {
- eInfo.setEndTime(new DateTime(execution.getEndTime()).toString(formatterTime));
- eInfo.setElapsed((execution.getEndTime() - execution.getStartTime()) / 1000);
- } else {
- eInfo.setElapsed((DateTime.now().getMillis() - execution.getStartTime()) / 1000);
- }
- eInfo.setExecId(execution.getExecId());
- eInfo.setFlowId(execution.getFlowId());
- eInfo.setOwner(execution.getSubmitUser());
- eInfo.setProjectId(execution.getProjectId());
- eInfo.setProjectPath(projectName);
- eInfo.setStartTime(new DateTime(execution.getStartTime()).toString(formatterTime));
- if (execution.getStatus().equalsIgnoreCase(ScheduleStatus.FAILED.getDesc())) {
- eInfo.setStatus(ScheduleStatus.FAILED.getCode());
- } else if (execution.getStatus().equalsIgnoreCase(ScheduleStatus.CANCELLED.getDesc())) {
- eInfo.setStatus(ScheduleStatus.CANCELLED.getCode());
- } else if (execution.getStatus().equalsIgnoreCase(ScheduleStatus.KILLED.getDesc())) {
- eInfo.setStatus(ScheduleStatus.KILLED.getCode());
- } else if (execution.getStatus().equalsIgnoreCase(ScheduleStatus.SUCCEEDED.getDesc())) {
- eInfo.setStatus(ScheduleStatus.SUCCEEDED.getCode());
- }
- excInfoList.add(eInfo);
- }
- }
- return excInfoList;
- }
- /**
- * 获取azkaban调度 流的执行情况 (分页)
- * @param excuteId 调度执行id
- * @param start
- * @param size
- * @return
- * @throws Exception
- */
- public ExecutionInfoBean getAzkabanExcutionDetails(String excuteId, String start, String size) throws Exception {
- String result = fetchexecflow(excuteId);
- if (StringUtils.isBlank(result)) {
- throw new CommonException("查询任务流的执行详情失败!");
- }
- ExecutionInfo ei = new Gson().fromJson(result, ExecutionInfo.class);
- if(ei==null) {
- throw new CommonException("查询任务流的执行详情失败!");
- }
- List<ExecNode> nodes = ei.getNodes();
- if (nodes == null || nodes.size() == 0) {
- return null;
- }
- ExecutionInfoBean eib = new ExecutionInfoBean();
- eib.setAttempt(ei.getAttempt());
- eib.setStartTime(new DateTime(ei.getStartTime()).toString(formatterTime));
- if (ei.getEndTime() > 0) {
- eib.setEndTime(new DateTime(ei.getEndTime()).toString(formatterTime));
- eib.setElapsed((ei.getEndTime() - ei.getStartTime()) / 1000);
- } else {
- eib.setElapsed((DateTime.now().getMillis() - ei.getStartTime()) / 1000);
- }
- eib.setExecid(ei.getExecid());
- eib.setFlow(ei.getFlow());
- eib.setFlowId(ei.getFlowId());
- eib.setJobId(ei.getId());
- eib.setNestedId(ei.getNestedId());
- eib.setProject(ei.getProject());
- eib.setProjectId(ei.getProjectId());
- String stats = ScheduleStatus.getDescByCode(ei.getStatus());
- if(StringUtils.isNotBlank(stats)){
- eib.setStatus(stats);
- }
- eib.setSubmitTime(new DateTime(ei.getSubmitTime()).toString(formatterTime));
- eib.setSubmitUser(ei.getSubmitUser());
- eib.setType(ei.getType());
- eib.setUpdateTime(new DateTime(ei.getUpdateTime()).toString(formatterTime));
- String flowLog= fetchExecFlowLogs(excuteId, start, size);
- eib.setFlowLog(flowLog);
- List<ExecNodeBean> nodeBeanList = Lists.newArrayList();
- for(ExecNode node:nodes) {
- ExecNodeBean ebn = new ExecNodeBean();
- ebn.setAttempt(node.getAttempt());
- ebn.setJobId(node.getId());
- ebn.setDependencies(node.getIn());
- ebn.setNestedId(node.getNestedId());
- ebn.setStartTime(new DateTime(ei.getStartTime()).toString(formatterTime));
- if (node.getEndTime() > 0) {
- ebn.setEndTime(new DateTime(node.getEndTime()).toString(formatterTime));
- ebn.setElapsed((node.getEndTime() - node.getStartTime()) / 1000);
- } else {
- ebn.setElapsed((DateTime.now().getMillis() - node.getStartTime()) / 1000);
- }
- String stats2 = ScheduleStatus.getDescByCode(node.getStatus());
- if(StringUtils.isNotBlank(stats2)){
- ebn.setStatus(stats2);
- }
- ebn.setType(node.getType());
- ebn.setUpdateTime(new DateTime(ei.getUpdateTime()).toString(formatterTime));
- String logs = fetchExecJobLogs(excuteId,ebn.getJobId(),start, size);
- ebn.setLogs(logs);
- nodeBeanList.add(ebn);
- }
- eib.setNodes(nodeBeanList);
- return eib;
- }
- /**
- * 获取一个项目的projectId
- * @param projectName
- * @return
- * @throws Exception
- */
- public String fetchProjectId(String projectName) throws Exception {
- SSLUtil.turnOffSslChecking();
- HttpHeaders hs = new HttpHeaders();
- hs.add("Content-Type", CONTENT_TYPE);
- hs.add("X-Requested-With", X_REQUESTED_WITH);
- hs.add("Accept", "text/plain;charset=utf-8");
- Map<String, String> map = new HashMap<>();
- map.put("id", login());
- map.put("project", projectName);
- ResponseEntity<String> exchange = restTemplate.exchange(
- azkabanConfig.getAzkUrl() + "/manager?&session.id={id}&project={project}&ajax=fetchprojectflows", HttpMethod.GET,
- new HttpEntity<String>(hs), String.class, map);
- if (exchange == null||HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
- logger.error("Azkaban获取一个项目的所有流信息失败:" + projectName);
- return null;
- }
- JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("projectId");
- if (obj == null) {
- logger.error("Azkaban获取一个项目的所有流信息失败:{}:{}", projectName);
- return null;
- }
- String projectId = obj.getAsString();
- if(StringUtils.isBlank(projectId)){
- logger.error("获取Azkaban projectId 异常");
- }
- return projectId;
- }
- /**
- * 获取
- * @param projectName
- * @return
- * @throws Exception
- */
- public String getLastScheduleStatus(String projectName) throws Exception {
- List<String> flows = fetchFlowsProject(projectName);
- if(CollectionUtils.isNotEmpty(flows)) {
- for (String flow : flows) {
- FlowExecution fe = fetchFlowExecutions(projectName, flow, "0", "1000");
- if (fe == null) {
- continue;
- }
- List<Execution> executions = fe.getExecutions();
- if (executions == null || executions.size() == 0) {
- continue;
- }
- String status = executions.get(0).getStatus();
- return status;
- }
- }
- return null;
- }
-
-
- }