经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Java » 查看文章
springboot集成调用Azkaban
来源:cnblogs  作者:不再_单纯  时间:2018/11/16 10:30:45  对本文有异议

springboot集成调用Azkaban

 

一、 说明

  1.Azkaban是由Linkedin公司推出的一个批量工作流任务调度器,主要用于在一个工作流内以一个特定的顺序运行一组工作和流程,它的配置是通过简单的key:value对的方式,通过配置中的dependencies 来设置依赖关系,这个依赖关系必须是无环的,否则会被视为无效的工作流。Azkaban使用job配置文件建立任务之间的依赖关系,并提供一个易于使用的web用户界面维护和跟踪你的工作流。

        2.springboot版本:2.0.5  azkaban版本:3.59.0

 

二、maven依赖

  1. <dependency>
  2. <groupId>com.google.code.gson</groupId>
  3. <artifactId>gson</artifactId>
  4. <version>2.8.5</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.commons</groupId>
  8. <artifactId>commons-pool2</artifactId>
  9. <version>2.5.0</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.httpcomponents</groupId>
  13. <artifactId>httpcore</artifactId>
  14. <version>4.4.9</version>
  15. </dependency>
  16. <dependency>
  17. <groupId>joda-time</groupId>
  18. <artifactId>joda-time</artifactId>
  19. <version>2.9.7</version>
  20. </dependency>
View Code

 

三、代码

  1.azkaban配置文件(注意需要在启动类@PropertySource标签中引入读取该配置文件)

  1. monitor.azkaban.username=azkaban
  2. monitor.azkaban.password=azkaban
  3. monitor.azkaban.url=http://192.168.11.12:8081
  4. monitor.azkaban.connectTimeout=60000
  5. monitor.azkaban.readTimeout=120000
azkaban.properties

  2.azkaban配置实体类

  1. import org.springframework.beans.factory.annotation.Value;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.http.client.SimpleClientHttpRequestFactory;
  5. import org.springframework.web.client.RestTemplate;
  6. @Configuration
  7. public class AzkabanConfig {
  8. @Value("${monitor.azkaban.username}")
  9. private String azkUsername;
  10. @Value("${monitor.azkaban.password}")
  11. private String azkPassword;
  12. @Value("${monitor.azkaban.url}")
  13. private String azkUrl;
  14. @Value("${monitor.azkaban.connectTimeout}")
  15. private int connectTimeout;
  16. @Value("${monitor.azkaban.readTimeout}")
  17. private int readTimeout;
  18. @Bean
  19. public RestTemplate getRestTemplate() {
  20. SimpleClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();
  21. requestFactory.setConnectTimeout(connectTimeout);
  22. requestFactory.setReadTimeout(readTimeout);
  23. RestTemplate restTemplate = new RestTemplate(requestFactory);
  24. return restTemplate;
  25. }
  26. public String getAzkUsername() {
  27. return azkUsername;
  28. }
  29. public void setAzkUsername(String azkUsername) {
  30. this.azkUsername = azkUsername;
  31. }
  32. public String getAzkPassword() {
  33. return azkPassword;
  34. }
  35. public void setAzkPassword(String azkPassword) {
  36. this.azkPassword = azkPassword;
  37. }
  38. public String getAzkUrl() {
  39. return azkUrl;
  40. }
  41. public void setAzkUrl(String azkUrl) {
  42. this.azkUrl = azkUrl;
  43. }
  44. }
AzkabanConfig

  3.HttpClient配置SSL绕过https证书 

  1. import java.security.KeyManagementException;
  2. import java.security.NoSuchAlgorithmException;
  3. import java.security.cert.X509Certificate;
  4. import javax.net.ssl.HttpsURLConnection;
  5. import javax.net.ssl.SSLContext;
  6. import javax.net.ssl.TrustManager;
  7. import javax.net.ssl.X509TrustManager;
  8. public class SSLUtil {
  9. private static final String PROTOCOL = "SSL";
  10. private static final TrustManager[] UNQUESTIONING_TRUST_MANAGER = new TrustManager[] { new X509TrustManager() {
  11. public java.security.cert.X509Certificate[] getAcceptedIssuers() {
  12. return null;
  13. }
  14. public void checkClientTrusted(X509Certificate[] certs, String authType) {
  15. }
  16. public void checkServerTrusted(X509Certificate[] certs, String authType) {
  17. }
  18. } };
  19. private SSLUtil() {
  20. }
  21. public static void turnOffSslChecking() throws NoSuchAlgorithmException, KeyManagementException {
  22. final SSLContext sc = SSLContext.getInstance(PROTOCOL);
  23. sc.init(null, UNQUESTIONING_TRUST_MANAGER, null);
  24. HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
  25. }
  26. public static void turnOnSslChecking() throws KeyManagementException, NoSuchAlgorithmException {
  27. SSLContext.getInstance(PROTOCOL).init(null, null, null);
  28. }
  29. }
SSLUtil

  4.常量类

  1. public interface SysContants {
  2. /**azkaban成功状态**/
  3. String AZK_SUCCESS = "success";
  4. }
SysContants

  5.根据azkaban返回数据定制实体类(注释少抱歉)

  1. import java.util.List;
  2. public class ExecNode {
  3. private String nestedId;
  4. private List<String> in;
  5. private String status;
  6. private String id;
  7. private String type;
  8. private Long updateTime;
  9. private Long startTime;
  10. private Long endTime;
  11. private Long attempt;
  12. public String getNestedId() {
  13. return nestedId;
  14. }
  15. public void setNestedId(String nestedId) {
  16. this.nestedId = nestedId;
  17. }
  18. public List<String> getIn() {
  19. return in;
  20. }
  21. public void setIn(List<String> in) {
  22. this.in = in;
  23. }
  24. public String getStatus() {
  25. return status;
  26. }
  27. public void setStatus(String status) {
  28. this.status = status;
  29. }
  30. public String getId() {
  31. return id;
  32. }
  33. public void setId(String id) {
  34. this.id = id;
  35. }
  36. public String getType() {
  37. return type;
  38. }
  39. public void setType(String type) {
  40. this.type = type;
  41. }
  42. public Long getUpdateTime() {
  43. return updateTime;
  44. }
  45. public void setUpdateTime(Long updateTime) {
  46. this.updateTime = updateTime;
  47. }
  48. public Long getStartTime() {
  49. return startTime;
  50. }
  51. public void setStartTime(Long startTime) {
  52. this.startTime = startTime;
  53. }
  54. public Long getEndTime() {
  55. return endTime;
  56. }
  57. public void setEndTime(Long endTime) {
  58. this.endTime = endTime;
  59. }
  60. public Long getAttempt() {
  61. return attempt;
  62. }
  63. public void setAttempt(Long attempt) {
  64. this.attempt = attempt;
  65. }
  66. }
ExecNode
  1. import java.util.List;
  2. public class ExecNodeBean {
  3. private String nestedId;
  4. private List<String> dependencies;
  5. private String status;
  6. private String jobId;
  7. private String type;
  8. private String updateTime;
  9. private String startTime;
  10. private String endTime;
  11. private Long attempt;
  12. private String logs;
  13. private Long elapsed;
  14. public String getNestedId() {
  15. return nestedId;
  16. }
  17. public void setNestedId(String nestedId) {
  18. this.nestedId = nestedId;
  19. }
  20. public List<String> getDependencies() {
  21. return dependencies;
  22. }
  23. public void setDependencies(List<String> dependencies) {
  24. this.dependencies = dependencies;
  25. }
  26. public String getStatus() {
  27. return status;
  28. }
  29. public void setStatus(String status) {
  30. this.status = status;
  31. }
  32. public String getJobId() {
  33. return jobId;
  34. }
  35. public void setJobId(String jobId) {
  36. this.jobId = jobId;
  37. }
  38. public String getType() {
  39. return type;
  40. }
  41. public void setType(String type) {
  42. this.type = type;
  43. }
  44. public String getUpdateTime() {
  45. return updateTime;
  46. }
  47. public void setUpdateTime(String updateTime) {
  48. this.updateTime = updateTime;
  49. }
  50. public String getStartTime() {
  51. return startTime;
  52. }
  53. public void setStartTime(String startTime) {
  54. this.startTime = startTime;
  55. }
  56. public String getEndTime() {
  57. return endTime;
  58. }
  59. public void setEndTime(String endTime) {
  60. this.endTime = endTime;
  61. }
  62. public Long getAttempt() {
  63. return attempt;
  64. }
  65. public void setAttempt(Long attempt) {
  66. this.attempt = attempt;
  67. }
  68. public String getLogs() {
  69. return logs;
  70. }
  71. public void setLogs(String logs) {
  72. this.logs = logs;
  73. }
  74. public Long getElapsed() {
  75. return elapsed;
  76. }
  77. public void setElapsed(Long elapsed) {
  78. this.elapsed = elapsed;
  79. }
  80. }
ExecNodeBean
  1. public class Execution {
  2. private String submitUser;
  3. private String flowId;
  4. private String status;
  5. private Long submitTime;
  6. private Long startTime;
  7. private Long endTime;
  8. private Long projectId;
  9. private Long execId;
  10. public String getSubmitUser() {
  11. return submitUser;
  12. }
  13. public void setSubmitUser(String submitUser) {
  14. this.submitUser = submitUser;
  15. }
  16. public String getFlowId() {
  17. return flowId;
  18. }
  19. public void setFlowId(String flowId) {
  20. this.flowId = flowId;
  21. }
  22. public String getStatus() {
  23. return status;
  24. }
  25. public void setStatus(String status) {
  26. this.status = status;
  27. }
  28. public Long getSubmitTime() {
  29. return submitTime;
  30. }
  31. public void setSubmitTime(Long submitTime) {
  32. this.submitTime = submitTime;
  33. }
  34. public Long getStartTime() {
  35. return startTime;
  36. }
  37. public void setStartTime(Long startTime) {
  38. this.startTime = startTime;
  39. }
  40. public Long getEndTime() {
  41. return endTime;
  42. }
  43. public void setEndTime(Long endTime) {
  44. this.endTime = endTime;
  45. }
  46. public Long getProjectId() {
  47. return projectId;
  48. }
  49. public void setProjectId(Long projectId) {
  50. this.projectId = projectId;
  51. }
  52. public Long getExecId() {
  53. return execId;
  54. }
  55. public void setExecId(Long execId) {
  56. this.execId = execId;
  57. }
Execution
  1. import java.util.List;
  2. public class ExecutionInfo {
  3. private String project;
  4. private String type;
  5. private Long updateTime;
  6. private Long attempt;
  7. private Long execid;
  8. private Long submitTime;
  9. private Long startTime;
  10. private Long endTime;
  11. private Long projectId;
  12. private String nestedId;
  13. private String submitUser;
  14. private String id;
  15. private String flowId;
  16. private String flow;
  17. private String status;
  18. private List<ExecNode> nodes;
  19. public String getProject() {
  20. return project;
  21. }
  22. public void setProject(String project) {
  23. this.project = project;
  24. }
  25. public String getType() {
  26. return type;
  27. }
  28. public void setType(String type) {
  29. this.type = type;
  30. }
  31. public Long getUpdateTime() {
  32. return updateTime;
  33. }
  34. public void setUpdateTime(Long updateTime) {
  35. this.updateTime = updateTime;
  36. }
  37. public Long getAttempt() {
  38. return attempt;
  39. }
  40. public void setAttempt(Long attempt) {
  41. this.attempt = attempt;
  42. }
  43. public Long getExecid() {
  44. return execid;
  45. }
  46. public void setExecid(Long execid) {
  47. this.execid = execid;
  48. }
  49. public Long getSubmitTime() {
  50. return submitTime;
  51. }
  52. public void setSubmitTime(Long submitTime) {
  53. this.submitTime = submitTime;
  54. }
  55. public Long getStartTime() {
  56. return startTime;
  57. }
  58. public void setStartTime(Long startTime) {
  59. this.startTime = startTime;
  60. }
  61. public Long getEndTime() {
  62. return endTime;
  63. }
  64. public void setEndTime(Long endTime) {
  65. this.endTime = endTime;
  66. }
  67. public Long getProjectId() {
  68. return projectId;
  69. }
  70. public void setProjectId(Long projectId) {
  71. this.projectId = projectId;
  72. }
  73. public String getNestedId() {
  74. return nestedId;
  75. }
  76. public void setNestedId(String nestedId) {
  77. this.nestedId = nestedId;
  78. }
  79. public String getSubmitUser() {
  80. return submitUser;
  81. }
  82. public void setSubmitUser(String submitUser) {
  83. this.submitUser = submitUser;
  84. }
  85. public String getId() {
  86. return id;
  87. }
  88. public void setId(String id) {
  89. this.id = id;
  90. }
  91. public String getFlowId() {
  92. return flowId;
  93. }
  94. public void setFlowId(String flowId) {
  95. this.flowId = flowId;
  96. }
  97. public String getFlow() {
  98. return flow;
  99. }
  100. public void setFlow(String flow) {
  101. this.flow = flow;
  102. }
  103. public String getStatus() {
  104. return status;
  105. }
  106. public void setStatus(String status) {
  107. this.status = status;
  108. }
  109. public List<ExecNode> getNodes() {
  110. return nodes;
  111. }
  112. public void setNodes(List<ExecNode> nodes) {
  113. this.nodes = nodes;
  114. }
  115. }
ExecutionInfo
  1. import java.util.List;
  2. public class ExecutionInfoBean{
  3. private String project;
  4. private String type;
  5. private String updateTime;
  6. private Long attempt;
  7. private Long execid;
  8. private String submitTime;
  9. private String startTime;
  10. private String endTime;
  11. private Long projectId;
  12. private String nestedId;
  13. private String submitUser;
  14. private String jobId;
  15. private String flowId;
  16. private String flow;
  17. private String status;
  18. private String flowLog;
  19. private Long elapsed;
  20. private List<ExecNodeBean> nodes;
  21. public String getProject() {
  22. return project;
  23. }
  24. public void setProject(String project) {
  25. this.project = project;
  26. }
  27. public String getType() {
  28. return type;
  29. }
  30. public void setType(String type) {
  31. this.type = type;
  32. }
  33. public String getUpdateTime() {
  34. return updateTime;
  35. }
  36. public void setUpdateTime(String updateTime) {
  37. this.updateTime = updateTime;
  38. }
  39. public Long getAttempt() {
  40. return attempt;
  41. }
  42. public void setAttempt(Long attempt) {
  43. this.attempt = attempt;
  44. }
  45. public Long getExecid() {
  46. return execid;
  47. }
  48. public void setExecid(Long execid) {
  49. this.execid = execid;
  50. }
  51. public String getSubmitTime() {
  52. return submitTime;
  53. }
  54. public void setSubmitTime(String submitTime) {
  55. this.submitTime = submitTime;
  56. }
  57. public String getStartTime() {
  58. return startTime;
  59. }
  60. public void setStartTime(String startTime) {
  61. this.startTime = startTime;
  62. }
  63. public String getEndTime() {
  64. return endTime;
  65. }
  66. public void setEndTime(String endTime) {
  67. this.endTime = endTime;
  68. }
  69. public Long getProjectId() {
  70. return projectId;
  71. }
  72. public void setProjectId(Long projectId) {
  73. this.projectId = projectId;
  74. }
  75. public String getNestedId() {
  76. return nestedId;
  77. }
  78. public void setNestedId(String nestedId) {
  79. this.nestedId = nestedId;
  80. }
  81. public String getSubmitUser() {
  82. return submitUser;
  83. }
  84. public void setSubmitUser(String submitUser) {
  85. this.submitUser = submitUser;
  86. }
  87. public String getJobId() {
  88. return jobId;
  89. }
  90. public void setJobId(String jobId) {
  91. this.jobId = jobId;
  92. }
  93. public String getFlowId() {
  94. return flowId;
  95. }
  96. public void setFlowId(String flowId) {
  97. this.flowId = flowId;
  98. }
  99. public String getFlow() {
  100. return flow;
  101. }
  102. public void setFlow(String flow) {
  103. this.flow = flow;
  104. }
  105. public String getStatus() {
  106. return status;
  107. }
  108. public void setStatus(String status) {
  109. this.status = status;
  110. }
  111. public String getFlowLog() {
  112. return flowLog;
  113. }
  114. public void setFlowLog(String flowLog) {
  115. this.flowLog = flowLog;
  116. }
  117. public Long getElapsed() {
  118. return elapsed;
  119. }
  120. public void setElapsed(Long elapsed) {
  121. this.elapsed = elapsed;
  122. }
  123. public List<ExecNodeBean> getNodes() {
  124. return nodes;
  125. }
  126. public void setNodes(List<ExecNodeBean> nodes) {
  127. this.nodes = nodes;
  128. }
  129. }
ExecutionInfoBean
  1. import java.util.List;
  2. public class FlowExecution {
  3. private String project;
  4. private String flow;
  5. private Long total;
  6. private Long length;
  7. private Long from;
  8. private Long projectId;
  9. private List<Execution> executions;
  10. public String getProject() {
  11. return project;
  12. }
  13. public void setProject(String project) {
  14. this.project = project;
  15. }
  16. public String getFlow() {
  17. return flow;
  18. }
  19. public void setFlow(String flow) {
  20. this.flow = flow;
  21. }
  22. public Long getTotal() {
  23. return total;
  24. }
  25. public void setTotal(Long total) {
  26. this.total = total;
  27. }
  28. public Long getLength() {
  29. return length;
  30. }
  31. public void setLength(Long length) {
  32. this.length = length;
  33. }
  34. public Long getFrom() {
  35. return from;
  36. }
  37. public void setFrom(Long from) {
  38. this.from = from;
  39. }
  40. public Long getProjectId() {
  41. return projectId;
  42. }
  43. public void setProjectId(Long projectId) {
  44. this.projectId = projectId;
  45. }
  46. public List<Execution> getExecutions() {
  47. return executions;
  48. }
  49. public void setExecutions(List<Execution> executions) {
  50. this.executions = executions;
  51. }
  52. }
FlowExecution
  1. public class GovernTaskRecordBean extends PageEntity {
  2. private static final long serialVersionUID = 1L;
  3. private String createTime;
  4. private String status;
  5. private String owner;
  6. private String startTime;
  7. private String endTime;
  8. private String flowId;
  9. private Long projectId;
  10. private Long execId;
  11. private String projectPath;
  12. private Long elapsed;
  13. public String getCreateTime() {
  14. return createTime;
  15. }
  16. public void setCreateTime(String createTime) {
  17. this.createTime = createTime;
  18. }
  19. public String getStatus() {
  20. return status;
  21. }
  22. public void setStatus(String status) {
  23. this.status = status;
  24. }
  25. public String getOwner() {
  26. return owner;
  27. }
  28. public void setOwner(String owner) {
  29. this.owner = owner;
  30. }
  31. public String getStartTime() {
  32. return startTime;
  33. }
  34. public void setStartTime(String startTime) {
  35. this.startTime = startTime;
  36. }
  37. public String getEndTime() {
  38. return endTime;
  39. }
  40. public void setEndTime(String endTime) {
  41. this.endTime = endTime;
  42. }
  43. public String getFlowId() {
  44. return flowId;
  45. }
  46. public void setFlowId(String flowId) {
  47. this.flowId = flowId;
  48. }
  49. public Long getProjectId() {
  50. return projectId;
  51. }
  52. public void setProjectId(Long projectId) {
  53. this.projectId = projectId;
  54. }
  55. public Long getExecId() {
  56. return execId;
  57. }
  58. public void setExecId(Long execId) {
  59. this.execId = execId;
  60. }
  61. public String getProjectPath() {
  62. return projectPath;
  63. }
  64. public void setProjectPath(String projectPath) {
  65. this.projectPath = projectPath;
  66. }
  67. public Long getElapsed() {
  68. return elapsed;
  69. }
  70. public void setElapsed(Long elapsed) {
  71. this.elapsed = elapsed;
  72. }
  73. }
GovernTaskRecordBean

  6.调度执行状态枚举类

  1. public enum ScheduleStatus {
  2. READY("READY","就绪"),SUCCEEDED("SUCCEEDED","成功"),KILLING("KILLING","停止中"),KILLED("KILLED","已中断"),FAILED("FAILED","失败"),
  3. SKIPPED("SKIPPED","跳过"),DISABLED("DISABLED","停用"),QUEUED("QUEUED","等待中"),CANCELLED("CANCELLED","取消执行"),
  4. RUNNING("RUNNING","运行中"),PAUSED("PAUSED","暂停");
  5. /**
  6. * 状态编码
  7. */
  8. private String code;
  9. /**
  10. * 状态描述
  11. */
  12. private String desc;
  13. public String getCode() {
  14. return code;
  15. }
  16. public void setCode(String code) {
  17. this.code = code;
  18. }
  19. public String getDesc() {
  20. return desc;
  21. }
  22. public void setDesc(String desc) {
  23. this.desc = desc;
  24. }
  25. ScheduleStatus(String code, String desc) {
  26. this.code = code;
  27. this.desc = desc;
  28. }
  29. }
ScheduleStatus

  7.AzkabanService接口类

  1. import java.io.BufferedOutputStream;
  2. import java.io.File;
  3. import java.io.FileOutputStream;
  4. import java.io.IOException;
  5. import java.io.InputStream;
  6. import java.io.OutputStream;
  7. import java.net.HttpURLConnection;
  8. import java.net.URL;
  9. import java.util.ArrayList;
  10. import java.util.Calendar;
  11. import java.util.Date;
  12. import java.util.HashMap;
  13. import java.util.List;
  14. import java.util.Map;
  15. import org.apache.commons.collections4.CollectionUtils;
  16. import org.apache.commons.io.IOUtils;
  17. import org.apache.commons.lang3.StringUtils;
  18. import org.joda.time.DateTime;
  19. import org.joda.time.format.DateTimeFormat;
  20. import org.joda.time.format.DateTimeFormatter;
  21. import org.slf4j.Logger;
  22. import org.slf4j.LoggerFactory;
  23. import org.springframework.beans.factory.annotation.Autowired;
  24. import org.springframework.core.io.FileSystemResource;
  25. import org.springframework.http.HttpEntity;
  26. import org.springframework.http.HttpHeaders;
  27. import org.springframework.http.HttpMethod;
  28. import org.springframework.http.ResponseEntity;
  29. import org.springframework.stereotype.Service;
  30. import org.springframework.util.LinkedMultiValueMap;
  31. import org.springframework.util.MultiValueMap;
  32. import org.springframework.web.client.RestTemplate;
  33. import com.google.common.collect.Lists;
  34. import com.google.gson.Gson;
  35. import com.google.gson.JsonElement;
  36. import com.google.gson.JsonObject;
  37. import org.apache.http.HttpStatus;
  38. /**
  39. * azkaban接口
  40. * @author hao
  41. *
  42. */
  43. @Service
  44. public class AzkabanService {
  45. private static final Logger logger = LoggerFactory.getLogger(AzkabanService.class);
  46. private static final String CONTENT_TYPE = "application/x-www-form-urlencoded; charset=utf-8";
  47. private static final String X_REQUESTED_WITH = "XMLHttpRequest";
  48. private static final DateTimeFormatter formatterTime = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
  49. @Autowired
  50. private RestTemplate restTemplate;
  51. @Autowired
  52. private AzkabanConfig azkabanConfig;
  53. /**
  54. * Azkaban登录接口,返回sessionId
  55. * @return
  56. * @throws Exception
  57. */
  58. public String login() throws Exception {
  59. SSLUtil.turnOffSslChecking();
  60. HttpHeaders hs = new HttpHeaders();
  61. hs.add("Content-Type", CONTENT_TYPE);
  62. hs.add("X-Requested-With", X_REQUESTED_WITH);
  63. LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();
  64. linkedMultiValueMap.add("action", "login");
  65. linkedMultiValueMap.add("username", azkabanConfig.getAzkUsername());
  66. linkedMultiValueMap.add("password", azkabanConfig.getAzkPassword());
  67. HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
  68. String result = restTemplate.postForObject(azkabanConfig.getAzkUrl(), httpEntity, String.class);
  69. if (!SysContants.AZK_SUCCESS.equals(new Gson().fromJson(result, JsonObject.class).get("status").getAsString())) {
  70. logger.error("Azkaban登录失败!返回错误信息:"+result);
  71. throw new Exception("Azkaban登录失败!");
  72. }
  73. return new Gson().fromJson(result, JsonObject.class).get("session.id").getAsString();
  74. }
  75. /**
  76. * Azkaban上传zip文件
  77. * @param projectName 项目名称
  78. * @param file 文件
  79. * @return projectId编号
  80. * @throws Exception
  81. */
  82. public String uploadZip(String projectName, File file) throws Exception {
  83. SSLUtil.turnOffSslChecking();
  84. FileSystemResource resource = new FileSystemResource(file);
  85. LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();
  86. linkedMultiValueMap.add("session.id", login());
  87. linkedMultiValueMap.add("ajax", "upload");
  88. linkedMultiValueMap.add("project", projectName);
  89. linkedMultiValueMap.add("file", resource);
  90. String result = restTemplate.postForObject(azkabanConfig.getAzkUrl() + "/manager", linkedMultiValueMap,
  91. String.class);
  92. if (StringUtils.isEmpty(new Gson().fromJson(result, JsonObject.class).get("projectId").getAsString())) {
  93. logger.error("上传文件至Azkaban失败:",projectName,file.getPath());
  94. logger.error("Azkaban上传文件失败!返回错误信息:"+result);
  95. return null;
  96. }
  97. return new Gson().fromJson(result, JsonObject.class).get("projectId").getAsString();
  98. }
  99. /**
  100. * Azkaban创建project
  101. * @param projectName,project名称
  102. * @param description,project描述
  103. * @return 是否成功
  104. * @throws Exception
  105. */
  106. public boolean createProject(String projectName, String description) throws Exception {
  107. SSLUtil.turnOffSslChecking();
  108. HttpHeaders hs = new HttpHeaders();
  109. hs.add("Content-Type", CONTENT_TYPE);
  110. hs.add("X-Requested-With", X_REQUESTED_WITH);
  111. LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();
  112. linkedMultiValueMap.add("session.id", login());
  113. linkedMultiValueMap.add("action", "create");
  114. linkedMultiValueMap.add("name", projectName);
  115. linkedMultiValueMap.add("description", description);
  116. HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
  117. logger.info("Azkaban请求信息:" + httpEntity.toString());
  118. String result = restTemplate.postForObject(azkabanConfig.getAzkUrl() + "/manager", httpEntity, String.class);
  119. logger.info("Azkaban返回创建Project信息:" + result);
  120. // 创建成功和已存在,都表示创建成功
  121. if (!SysContants.AZK_SUCCESS
  122. .equals(new Gson().fromJson(result, JsonObject.class).get("status").getAsString())) {
  123. if (!"Project already exists."
  124. .equals(new Gson().fromJson(result, JsonObject.class).get("message").getAsString())) {
  125. logger.error("创建Azkaban Project失败:",projectName);
  126. logger.error("创建Azkaban Project失败!返回错误信息:"+result);
  127. return false;
  128. }
  129. }
  130. return true;
  131. }
  132. /**
  133. * Azkaban删除project
  134. * @param projectName 项目名称
  135. * @throws Exception
  136. */
  137. public void deleteProject(String projectName) throws Exception {
  138. SSLUtil.turnOffSslChecking();
  139. HttpHeaders hs = new HttpHeaders();
  140. hs.add("Content-Type", CONTENT_TYPE);
  141. hs.add("X-Requested-With", X_REQUESTED_WITH);
  142. hs.add("Accept", "text/plain;charset=utf-8");
  143. Map<String, String> map = new HashMap<>();
  144. map.put("id", login());
  145. map.put("project", projectName);
  146. ResponseEntity<String> exchange = restTemplate.exchange(
  147. azkabanConfig.getAzkUrl() + "/manager?session.id={id}&delete=true&project={project}", HttpMethod.GET,
  148. new HttpEntity<String>(hs), String.class, map);
  149. if (HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
  150. logger.error("删除Azkaban Project失败!返回错误信息:"+exchange);
  151. throw new Exception("删除Azkaban Project失败");
  152. }
  153. }
  154. /**
  155. * 获取一个项目的所有流flows
  156. * @param projectName 项目名称
  157. * @return List<String> 项目的所有流
  158. * @throws Exception
  159. */
  160. public List<String> fetchFlowsProject(String projectName) throws Exception {
  161. SSLUtil.turnOffSslChecking();
  162. HttpHeaders hs = new HttpHeaders();
  163. hs.add("Content-Type", CONTENT_TYPE);
  164. hs.add("X-Requested-With", X_REQUESTED_WITH);
  165. hs.add("Accept", "text/plain;charset=utf-8");
  166. Map<String, String> map = new HashMap<>();
  167. map.put("id", login());
  168. map.put("project", projectName);
  169. ResponseEntity<String> exchange = restTemplate.exchange(
  170. azkabanConfig.getAzkUrl() + "/manager?&session.id={id}&project={project}&ajax=fetchprojectflows", HttpMethod.GET,
  171. new HttpEntity<String>(hs), String.class, map);
  172. if (exchange == null||HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
  173. logger.error("Azkaban获取一个项目的所有流信息失败:" + projectName);
  174. logger.error("Azkaban获取一个项目的所有流信息失败:!返回错误信息:"+exchange);
  175. return null;
  176. }
  177. JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("flows");
  178. if (obj == null) {
  179. logger.error("Azkaban获取一个项目的所有流信息失败:{}:{}", projectName);
  180. return null;
  181. }
  182. List<String> flows = new ArrayList<String>();
  183. for(JsonElement jobj:obj.getAsJsonArray()) {
  184. flows.add(jobj.getAsJsonObject().get("flowId").getAsString());
  185. }
  186. return flows;
  187. }
  188. /**
  189. * 获取一个流的所有作业
  190. * @param projectName 项目名
  191. * @param flowId 流id
  192. * @return
  193. * @throws Exception
  194. */
  195. public String fetchJobsFlow(String projectName, String flowId) throws Exception {
  196. SSLUtil.turnOffSslChecking();
  197. HttpHeaders hs = new HttpHeaders();
  198. hs.add("Content-Type", CONTENT_TYPE);
  199. hs.add("X-Requested-With", X_REQUESTED_WITH);
  200. hs.add("Accept", "text/plain;charset=utf-8");
  201. Map<String, String> map = new HashMap<>();
  202. map.put("id", login());
  203. map.put("project", projectName);
  204. map.put("flow", flowId);
  205. ResponseEntity<String> exchange = restTemplate.exchange(
  206. azkabanConfig.getAzkUrl() + "/manager?&session.id={id}&project={project}&flow={flow}&ajax=fetchflowgraph", HttpMethod.GET,
  207. new HttpEntity<String>(hs), String.class, map);
  208. if (exchange == null) {
  209. logger.error("Azkaban获取一个项目的所有流信息失败:" + projectName);
  210. return null;
  211. }
  212. logger.debug("Azkaban获取一个项目的所有流信息:" + exchange);
  213. if (HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
  214. throw new Exception("Azkaban获取一个项目的所有流信息失败");
  215. }
  216. return exchange.toString();
  217. }
  218. /**
  219. * Flow 获取执行的project 列表
  220. * azkaban api 获取流的执行
  221. * @param projectName 项目名
  222. * @param flowId 流id
  223. * @param start
  224. * @param length
  225. * @return
  226. * @throws Exception
  227. */
  228. public FlowExecution fetchFlowExecutions(String projectName, String flowId, String start,String length) throws Exception {
  229. SSLUtil.turnOffSslChecking();
  230. HttpHeaders hs = new HttpHeaders();
  231. hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
  232. hs.add("X-Requested-With", "XMLHttpRequest");
  233. hs.add("Accept", "text/plain;charset=utf-8");
  234. Map<String, String> map = new HashMap<>();
  235. map.put("id", login());
  236. map.put("project", projectName);
  237. map.put("flow", flowId);
  238. map.put("start", String.valueOf(start));
  239. map.put("length", String.valueOf(length));
  240. ResponseEntity<String> exchange = restTemplate.exchange(
  241. azkabanConfig.getAzkUrl() + "/manager?session.id={id}&ajax=fetchFlowExecutions&project={project}&flow={flow}&start={start}&length={length}",
  242. HttpMethod.GET, new HttpEntity<String>(hs), String.class, map);
  243. if (exchange == null || HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
  244. logger.error("Azkaban获取一个项目运行记录信息失败:{}:{}", projectName, flowId);
  245. return null;
  246. }
  247. return new Gson().fromJson(exchange.getBody(), FlowExecution.class);
  248. }
  249. /**
  250. * Flow 获取正在执行的流id
  251. * @param projectName
  252. * @param flowId
  253. * @return
  254. * @throws Exception
  255. */
  256. public String getRunning(String projectName, String flowId) throws Exception {
  257. SSLUtil.turnOffSslChecking();
  258. HttpHeaders hs = new HttpHeaders();
  259. hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
  260. hs.add("X-Requested-With", "XMLHttpRequest");
  261. hs.add("Accept", "text/plain;charset=utf-8");
  262. Map<String, String> map = new HashMap<>();
  263. map.put("id", login());
  264. map.put("project", projectName);
  265. map.put("flow", flowId);
  266. ResponseEntity<String> exchange = restTemplate.exchange(
  267. azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=getRunning&project={project}&flow={flow}", HttpMethod.GET,
  268. new HttpEntity<String>(hs), String.class, map);
  269. return exchange.getBody();
  270. }
  271. /**
  272. * Execute a Flow 执行一个流 还有很多其他参数 具体参考azkabanConfig.getAzkUrl()
  273. *
  274. * @throws KeyManagementException
  275. * @throws NoSuchAlgorithmException
  276. */
  277. public String executeFlow(String projectName, String flowId) throws Exception {
  278. SSLUtil.turnOffSslChecking();
  279. HttpHeaders hs = new HttpHeaders();
  280. hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
  281. hs.add("X-Requested-With", "XMLHttpRequest");
  282. hs.add("Accept", "text/plain;charset=utf-8");
  283. Map<String, String> map = new HashMap<>();
  284. map.put("id", login());
  285. map.put("project", projectName);
  286. map.put("flow", flowId);
  287. ResponseEntity<String> exchange = restTemplate.exchange(
  288. azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=executeFlow&project={project}&flow={flow}", HttpMethod.GET,
  289. new HttpEntity<String>(hs), String.class, map);
  290. if (exchange == null||HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
  291. logger.error("执行一个流请求失败:{}:{}", projectName, flowId);
  292. return null;
  293. }
  294. JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("execid");
  295. if (obj == null) {
  296. logger.error("执行一个流失败:{}:{}", projectName, flowId);
  297. return null;
  298. }
  299. return obj.getAsString();
  300. }
  301. /**
  302. * Cancel a Flow Execution 取消流程执行
  303. * azkaban api 取消流程执行
  304. * @throws KeyManagementException
  305. * @throws NoSuchAlgorithmException
  306. */
  307. public void cancelEXEaFlow(String projectName,String start,String size) throws Exception {
  308. int flag=0;
  309. List<String> flows = fetchFlowsProject(projectName);//获取所有流
  310. for (String flow : flows) {
  311. FlowExecution fe = fetchFlowExecutions(projectName, flow, start, size);
  312. if (fe == null) {
  313. continue;
  314. }
  315. List<Execution> executions = fe.getExecutions();//获取执行id
  316. for (Execution execution : executions) {
  317. if(null!=execution&&null!=execution.getExecId()&&"RUNNING".equals(execution.getStatus())){//运行中的
  318. SSLUtil.turnOffSslChecking();
  319. HttpHeaders hs = new HttpHeaders();
  320. hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
  321. hs.add("X-Requested-With", "XMLHttpRequest");
  322. hs.add("Accept", "text/plain;charset=utf-8");
  323. Map<String, String> map = new HashMap<>();
  324. map.put("id", login());
  325. map.put("execid", String.valueOf(execution.getExecId()));
  326. ResponseEntity<String> exchange = restTemplate.exchange(
  327. azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=cancelFlow&execid={execid}", HttpMethod.GET,
  328. new HttpEntity<String>(hs), String.class, map);
  329. System.out.println(exchange.getBody());
  330. if (exchange == null||HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
  331. logger.error("取消执行调度失败,请求azkaban接口异常:"+exchange);
  332. throw new Exception("取消执行调度失败,请求azkaban接口异常:"+exchange);
  333. }
  334. JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("error");
  335. if (obj != null) {
  336. throw new Exception("取消执行调度失败,请刷新列表获取最新调度状态!");
  337. }
  338. flag++;
  339. }
  340. }
  341. }
  342. if(0==flag){
  343. throw new Exception("该调度不是运行中状态,请刷新列表获取最新状态!");
  344. }
  345. }
  346. /**
  347. * 根据时间 创建调度任务
  348. *
  349. * @param projectId
  350. * @param projectName
  351. * @param flow
  352. * @param flowName
  353. * @param recurring,是否循环,on循环
  354. * @param period,循环频率:M:Months,w:Weeks,d:Days,h:Hours,m:Minutes,s:Seconds;如60s,支持分钟的倍数
  355. * @param date,开始时间
  356. * @return 返回scheduleId
  357. * @throws Exception
  358. */
  359. public String scheduleEXEaFlow(String projectId, String projectName, String flow, String flowName, String recurring,
  360. String period, Date date) throws Exception {
  361. SSLUtil.turnOffSslChecking();
  362. HttpHeaders hs = new HttpHeaders();
  363. hs.add("Content-Type", CONTENT_TYPE);
  364. hs.add("X-Requested-With", X_REQUESTED_WITH);
  365. LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();
  366. linkedMultiValueMap.add("session.id", login());
  367. linkedMultiValueMap.add("ajax", "scheduleFlow");
  368. linkedMultiValueMap.add("projectName", projectName);
  369. linkedMultiValueMap.add("projectId", projectId);
  370. linkedMultiValueMap.add("flow", flow);
  371. linkedMultiValueMap.add("flowName", flowName);
  372. linkedMultiValueMap.add("is_recurring", recurring);
  373. linkedMultiValueMap.add("period", period);
  374. scheduleTimeInit(linkedMultiValueMap, date);
  375. HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
  376. String result = restTemplate.postForObject(azkabanConfig.getAzkUrl() + "/schedule", httpEntity, String.class);
  377. logger.info("Azkaban返回根据时间创建定时任务信息:" + result);
  378. if (!SysContants.AZK_SUCCESS.equals(new Gson().fromJson(result, JsonObject.class).get("status").getAsString())
  379. || new Gson().fromJson(result, JsonObject.class).get("scheduleId").getAsInt() < 0) {
  380. logger.error("Azkaban返回根据时间创建定时任务信息失败:!返回错误信息:"+result);
  381. throw new Exception("根据时间创建定时任务失败");
  382. }
  383. return new Gson().fromJson(result, JsonObject.class).get("scheduleId").getAsString();
  384. }
  385. /**
  386. * 根据cron表达式 创建调度任务
  387. * @param projectName 项目名称
  388. * @param cron cron表达式
  389. * @param flow 流
  390. * @param flowName 流名称
  391. * @return 返回scheduleId
  392. * @throws Exception
  393. */
  394. public String scheduleByCronEXEaFlow(String projectName, String cron, String flowName)
  395. throws Exception {
  396. SSLUtil.turnOffSslChecking();
  397. HttpHeaders hs = new HttpHeaders();
  398. hs.add("Content-Type", CONTENT_TYPE);
  399. hs.add("X-Requested-With", X_REQUESTED_WITH);
  400. LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();
  401. linkedMultiValueMap.add("session.id", login());
  402. linkedMultiValueMap.add("ajax", "scheduleCronFlow");
  403. linkedMultiValueMap.add("projectName", projectName);
  404. linkedMultiValueMap.add("cronExpression", cron);
  405. linkedMultiValueMap.add("flow", flowName);
  406. HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
  407. String result = restTemplate.postForObject(azkabanConfig.getAzkUrl() + "/schedule", httpEntity, String.class);
  408. if (!SysContants.AZK_SUCCESS
  409. .equals(new Gson().fromJson(result, JsonObject.class).get("status").getAsString())) {
  410. logger.error("Azkaban返回根据时间创建定时任务信息失败:!返回错误信息:"+result);
  411. return null;
  412. }
  413. return new Gson().fromJson(result, JsonObject.class).get("scheduleId").getAsString();
  414. }
  415. /**
  416. * 根据scheduleId取消一个流的调度
  417. * 暂停一次执行,输入为exec id。如果这个执行不是处于running状态,会返回错误信息。
  418. * @param scheduleId
  419. * @throws Exception
  420. */
  421. public boolean unscheduleFlow(String scheduleId) {
  422. try {
  423. SSLUtil.turnOffSslChecking();
  424. HttpHeaders hs = new HttpHeaders();
  425. hs.add("Content-Type", CONTENT_TYPE);
  426. hs.add("X-Requested-With", X_REQUESTED_WITH);
  427. LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();
  428. linkedMultiValueMap.add("session.id", login());
  429. linkedMultiValueMap.add("action", "removeSched");
  430. linkedMultiValueMap.add("scheduleId", scheduleId);
  431. HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
  432. String result = restTemplate.postForObject(azkabanConfig.getAzkUrl() + "/schedule", httpEntity,
  433. String.class);
  434. if (StringUtils.isBlank(result)) {
  435. return false;
  436. }
  437. if (!SysContants.AZK_SUCCESS
  438. .equals(String.valueOf(new Gson().fromJson(result, JsonObject.class).get("status")))) {
  439. logger.error("取消流调度信息失败:{}", scheduleId);
  440. logger.error("Azkaban取消流调度信息失败失败:!返回错误信息:"+result);
  441. return false;
  442. }
  443. } catch (Exception e) {
  444. logger.error("取消流调度信息失败:{}", scheduleId);
  445. return false;
  446. }
  447. return true;
  448. }
  449. /**
  450. * 下载Azkaban压缩文件
  451. * @param projectName 项目名称
  452. * @param zipPath 文件路径
  453. * @throws Exception 文件异常
  454. */
  455. public void downLoadZip(String projectName, String zipPath) throws Exception {
  456. OutputStream output = null;
  457. BufferedOutputStream bufferedOutput = null;
  458. try {
  459. URL url = new URL(azkabanConfig.getAzkUrl() + "/manager?session.id=" + login() + "&download=true&project="
  460. + projectName);
  461. HttpURLConnection conn = (HttpURLConnection) url.openConnection();
  462. conn.setConnectTimeout(3 * 1000);
  463. InputStream inputStream = conn.getInputStream();
  464. File file = new File(zipPath);
  465. output = new FileOutputStream(file);
  466. bufferedOutput = new BufferedOutputStream(output);
  467. bufferedOutput.write(IOUtils.toByteArray(inputStream));
  468. } catch (Exception e) {
  469. logger.info("下载Azkaban压缩文件异常:" + e.getMessage(), e);
  470. } finally {
  471. if (bufferedOutput != null) {
  472. try {
  473. bufferedOutput.flush();
  474. bufferedOutput.close();
  475. } catch (IOException e) {
  476. logger.info("关闭流异常:" + e.getMessage(), e);
  477. }
  478. }
  479. if (output != null) {
  480. try {
  481. output.close();
  482. } catch (IOException e) {
  483. logger.info("关闭流异常:" + e.getMessage(), e);
  484. }
  485. }
  486. }
  487. }
  488. /**
  489. * 获取一个调度器job的信息 根据project的id 和 flowId
  490. * @param projectId 项目名称
  491. * @param flowId 流编号
  492. * @return job的信息
  493. */
  494. public String fetchSchedule(String projectId, String flowId) {
  495. try {
  496. SSLUtil.turnOffSslChecking();
  497. HttpHeaders hs = new HttpHeaders();
  498. hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
  499. hs.add("X-Requested-With", "XMLHttpRequest");
  500. hs.add("Accept", "text/plain;charset=utf-8");
  501. Map<String, String> map = new HashMap<>();
  502. map.put("id", login());
  503. map.put("projectId", projectId);
  504. map.put("flowId", flowId);
  505. ResponseEntity<String> exchange = restTemplate.exchange(
  506. azkabanConfig.getAzkUrl()
  507. + "/schedule?session.id={id}&ajax=fetchSchedule&projectId={projectId}&flowId={flowId}",
  508. HttpMethod.GET, new HttpEntity<String>(hs), String.class, map);
  509. if (HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
  510. logger.error("获取一个调度器job的信息失败:{}:{}", projectId, flowId);
  511. return null;
  512. }
  513. JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("schedule");
  514. if (obj == null) {
  515. logger.error("获取一个调度器job的信息失败:{}:{}", projectId, flowId);
  516. return null;
  517. }
  518. return obj.getAsJsonObject().get("scheduleId").getAsString();
  519. } catch (Exception e) {
  520. logger.error("获取一个调度器job的信息失败:{}:{}", projectId, flowId);
  521. }
  522. return null;
  523. }
  524. /**
  525. * SLA 设置调度任务 执行的时候 或者执行成功失败等等的规则匹配 发邮件或者...
  526. * @return
  527. * @throws Exception
  528. */
  529. public String setSla() throws Exception {
  530. SSLUtil.turnOffSslChecking();
  531. HttpHeaders hs = new HttpHeaders();
  532. hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
  533. hs.add("X-Requested-With", "XMLHttpRequest");
  534. LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();
  535. linkedMultiValueMap.add("session.id", "ffad7355-4427-4770-9c14-3d19736fa73a");
  536. linkedMultiValueMap.add("ajax", "setSla");
  537. linkedMultiValueMap.add("scheduleId", "6");
  538. linkedMultiValueMap.add("slaEmails", "771177@qq.com");
  539. linkedMultiValueMap.add("settings[0]", "begin,SUCCESS,5:00,true,false");
  540. linkedMultiValueMap.add("settings[1]", "exe,SUCCESS,5:00,true,false");
  541. linkedMultiValueMap.add("settings[2]", "end,SUCCESS,5:00,true,false");
  542. // linkedMultiValueMap.add("settings[3]",
  543. // "xxx,SUCCESS,5:00,true,false");
  544. HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
  545. String postForObject = restTemplate.postForObject(azkabanConfig.getAzkUrl() + "/schedule", httpEntity, String.class);
  546. return postForObject;
  547. }
  548. /**
  549. * SLA 获取调度的规则配置
  550. * @throws Exception
  551. */
  552. public void slaInfo() throws Exception {
  553. SSLUtil.turnOffSslChecking();
  554. HttpHeaders hs = new HttpHeaders();
  555. hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
  556. hs.add("X-Requested-With", "XMLHttpRequest");
  557. hs.add("Accept", "text/plain;charset=utf-8");
  558. Map<String, String> map = new HashMap<>();
  559. map.put("id", "c4adf192-dcf4-4e05-bd08-f6989dc544a7");
  560. map.put("scheduleId", "6");
  561. ResponseEntity<String> exchange = restTemplate.exchange(
  562. azkabanConfig.getAzkUrl() + "/schedule?session.id={id}&ajax=slaInfo&scheduleId={scheduleId}", HttpMethod.GET,
  563. new HttpEntity<String>(hs), String.class, map);
  564. System.out.println(exchange.getBody());
  565. }
  566. /**
  567. * Execution 暂停一个执行流
  568. * @throws Exception
  569. */
  570. public void pauseFlow() throws Exception {
  571. SSLUtil.turnOffSslChecking();
  572. HttpHeaders hs = new HttpHeaders();
  573. hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
  574. hs.add("X-Requested-With", "XMLHttpRequest");
  575. hs.add("Accept", "text/plain;charset=utf-8");
  576. Map<String, String> map = new HashMap<>();
  577. map.put("id", "c4adf192-dcf4-4e05-bd08-f6989dc544a7");
  578. map.put("execid", "12");
  579. ResponseEntity<String> exchange = restTemplate.exchange(
  580. azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=pauseFlow&execid={execid}", HttpMethod.GET,
  581. new HttpEntity<String>(hs), String.class, map);
  582. System.out.println(exchange.getBody());
  583. }
  584. /**
  585. * Flow Execution 重新执行一个执行流
  586. * @throws Exception
  587. */
  588. public void resumeFlow() throws Exception {
  589. SSLUtil.turnOffSslChecking();
  590. HttpHeaders hs = new HttpHeaders();
  591. hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
  592. hs.add("X-Requested-With", "XMLHttpRequest");
  593. hs.add("Accept", "text/plain;charset=utf-8");
  594. Map<String, String> map = new HashMap<>();
  595. map.put("id", "c4adf192-dcf4-4e05-bd08-f6989dc544a7");
  596. map.put("execid", "11");
  597. ResponseEntity<String> exchange = restTemplate.exchange(
  598. azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=resumeFlow&execid={execid}", HttpMethod.GET,
  599. new HttpEntity<String>(hs), String.class, map);
  600. System.out.println(exchange.getBody());
  601. }
  602. /**
  603. * 获取一个执行流的详细信息 这个流的每个节点的信息 成功或者失败等等
  604. * @param execid 执行id
  605. * @return
  606. * @throws Exception
  607. */
  608. public String fetchexecflow(String execid) throws Exception {
  609. SSLUtil.turnOffSslChecking();
  610. HttpHeaders hs = new HttpHeaders();
  611. hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
  612. hs.add("X-Requested-With", "XMLHttpRequest");
  613. hs.add("Accept", "text/plain;charset=utf-8");
  614. Map<String, String> map = new HashMap<>();
  615. map.put("id", login());
  616. map.put("execid", execid);
  617. ResponseEntity<String> exchange = restTemplate.exchange(
  618. azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=fetchexecflow&execid={execid}", HttpMethod.GET,
  619. new HttpEntity<String>(hs), String.class, map);
  620. if (exchange == null || HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
  621. logger.error("获取一个执行流的详细信息失败:" + execid);
  622. return null;
  623. }
  624. return exchange.getBody();
  625. }
  626. /**
  627. * 获取一个执行流的日志
  628. * @param execid 执行编号
  629. * @param jobId job编号
  630. * @param offset
  631. * @param length
  632. * @return
  633. * @throws Exception
  634. */
  635. public String fetchExecJobLogs(String execid,String jobId,String offset,String length) throws Exception {
  636. SSLUtil.turnOffSslChecking();
  637. HttpHeaders hs = new HttpHeaders();
  638. hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
  639. hs.add("X-Requested-With", "XMLHttpRequest");
  640. hs.add("Accept", "text/plain;charset=utf-8");
  641. Map<String, String> map = new HashMap<>();
  642. map.put("id", login());
  643. map.put("execid", execid);
  644. map.put("jobId", jobId);
  645. map.put("offset", offset);
  646. map.put("length", length);
  647. ResponseEntity<String> exchange = restTemplate.exchange(
  648. azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=fetchExecJobLogs&execid={execid}&jobId={jobId}&offset={offset}&length={length}",
  649. HttpMethod.GET, new HttpEntity<String>(hs), String.class, map);
  650. if (exchange == null || HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
  651. logger.error("获取一个执行流的详细信息失败:{}:{}", execid,jobId);
  652. return null;
  653. }
  654. JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("data");
  655. if (obj == null) {
  656. logger.error("获取一个执行流的详细信息为空:{}:{}", execid,jobId);
  657. return null;
  658. }
  659. return obj.getAsString();
  660. }
  661. /**
  662. * 获取一个执行流的日志概要
  663. * @param execid
  664. * @param offset
  665. * @param length
  666. * @return
  667. * @throws Exception
  668. */
  669. public String fetchExecFlowLogs(String execid,String offset,String length) throws Exception {
  670. SSLUtil.turnOffSslChecking();
  671. HttpHeaders hs = new HttpHeaders();
  672. hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
  673. hs.add("X-Requested-With", "XMLHttpRequest");
  674. hs.add("Accept", "text/plain;charset=utf-8");
  675. Map<String, String> map = new HashMap<>();
  676. map.put("id", login());
  677. map.put("execid", execid);
  678. map.put("offset", offset);
  679. map.put("length", length);
  680. ResponseEntity<String> exchange = restTemplate.exchange(
  681. azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=fetchExecFlowLogs&execid={execid}&offset={offset}&length={length}",
  682. HttpMethod.GET, new HttpEntity<String>(hs), String.class, map);
  683. if (exchange == null || HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
  684. logger.error("获取一个执行流的日志概要信息失败:{}", execid);
  685. return null;
  686. }
  687. JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("data");
  688. if (obj == null) {
  689. logger.error("获取一个执行流的日志概要信息为空:{}:{}", execid);
  690. return null;
  691. }
  692. return obj.getAsString();
  693. }
  694. /**
  695. * 获取执行流的信息状态
  696. * @throws Exception
  697. */
  698. public void fetchexecflowupdate() throws Exception {
  699. SSLUtil.turnOffSslChecking();
  700. HttpHeaders hs = new HttpHeaders();
  701. hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
  702. hs.add("X-Requested-With", "XMLHttpRequest");
  703. hs.add("Accept", "text/plain;charset=utf-8");
  704. Map<String, String> map = new HashMap<>();
  705. map.put("id", "c4adf192-dcf4-4e05-bd08-f6989dc544a7");
  706. map.put("execid", "11");
  707. map.put("lastUpdateTime", "-1");
  708. ResponseEntity<String> exchange = restTemplate.exchange(
  709. azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=fetchexecflowupdate&execid={execid}&lastUpdateTime={lastUpdateTime}",
  710. HttpMethod.GET, new HttpEntity<String>(hs), String.class, map);
  711. System.out.println(exchange.getBody());
  712. }
  713. private void scheduleTimeInit(LinkedMultiValueMap<String, String> linkedMultiValueMap, Date date) {
  714. Calendar calendar = Calendar.getInstance();
  715. calendar.setTime(date);
  716. Integer year = calendar.get(Calendar.YEAR);
  717. Integer month = calendar.get(Calendar.MONTH) + 1;
  718. Integer day = calendar.get(Calendar.DATE);
  719. Integer hour = calendar.get(Calendar.HOUR_OF_DAY);
  720. Integer minute = calendar.get(Calendar.MINUTE);
  721. linkedMultiValueMap.add("scheduleTime", hour + "," + minute + (hour > 11 ? ",pm,PDT" : ",am,EDT"));
  722. linkedMultiValueMap.add("scheduleDate", month + "/" + day + "/" + year);
  723. }
  724. /**
  725. * 获取azkaban调度 调度概况 (分页查询)
  726. * @param projectName 工程名称
  727. * @param start 分页参数
  728. * @param size 分页参数
  729. * @return
  730. * @throws Exception
  731. * @throws Exception
  732. */
  733. public List<GovernTaskRecordBean> getAzkabanExcutions(String projectName,String start,String size) throws Exception {
  734. List<GovernTaskRecordBean> excInfoList = Lists.newArrayList();
  735. List<String> flows = fetchFlowsProject(projectName);
  736. for (String flow : flows) {
  737. FlowExecution fe = fetchFlowExecutions(projectName, flow, start, size);
  738. if (fe == null) {
  739. continue;
  740. }
  741. List<Execution> executions = fe.getExecutions();
  742. for (Execution execution : executions) {
  743. GovernTaskRecordBean eInfo = new GovernTaskRecordBean();
  744. eInfo.setCreateTime(new DateTime(execution.getSubmitTime()).toString(formatterTime));
  745. if (execution.getEndTime() > 0) {
  746. eInfo.setEndTime(new DateTime(execution.getEndTime()).toString(formatterTime));
  747. eInfo.setElapsed((execution.getEndTime() - execution.getStartTime()) / 1000);
  748. } else {
  749. eInfo.setElapsed((DateTime.now().getMillis() - execution.getStartTime()) / 1000);
  750. }
  751. eInfo.setExecId(execution.getExecId());
  752. eInfo.setFlowId(execution.getFlowId());
  753. eInfo.setOwner(execution.getSubmitUser());
  754. eInfo.setProjectId(execution.getProjectId());
  755. eInfo.setProjectPath(projectName);
  756. eInfo.setStartTime(new DateTime(execution.getStartTime()).toString(formatterTime));
  757. if (execution.getStatus().equalsIgnoreCase(ScheduleStatus.FAILED.getDesc())) {
  758. eInfo.setStatus(ScheduleStatus.FAILED.getCode());
  759. } else if (execution.getStatus().equalsIgnoreCase(ScheduleStatus.CANCELLED.getDesc())) {
  760. eInfo.setStatus(ScheduleStatus.CANCELLED.getCode());
  761. } else if (execution.getStatus().equalsIgnoreCase(ScheduleStatus.KILLED.getDesc())) {
  762. eInfo.setStatus(ScheduleStatus.KILLED.getCode());
  763. } else if (execution.getStatus().equalsIgnoreCase(ScheduleStatus.SUCCEEDED.getDesc())) {
  764. eInfo.setStatus(ScheduleStatus.SUCCEEDED.getCode());
  765. }
  766. excInfoList.add(eInfo);
  767. }
  768. }
  769. return excInfoList;
  770. }
  771. /**
  772. * 获取azkaban调度 流的执行情况 (分页)
  773. * @param excuteId 调度执行id
  774. * @param start
  775. * @param size
  776. * @return
  777. * @throws Exception
  778. */
  779. public ExecutionInfoBean getAzkabanExcutionDetails(String excuteId, String start, String size) throws Exception {
  780. String result = fetchexecflow(excuteId);
  781. if (StringUtils.isBlank(result)) {
  782. throw new CommonException("查询任务流的执行详情失败!");
  783. }
  784. ExecutionInfo ei = new Gson().fromJson(result, ExecutionInfo.class);
  785. if(ei==null) {
  786. throw new CommonException("查询任务流的执行详情失败!");
  787. }
  788. List<ExecNode> nodes = ei.getNodes();
  789. if (nodes == null || nodes.size() == 0) {
  790. return null;
  791. }
  792. ExecutionInfoBean eib = new ExecutionInfoBean();
  793. eib.setAttempt(ei.getAttempt());
  794. eib.setStartTime(new DateTime(ei.getStartTime()).toString(formatterTime));
  795. if (ei.getEndTime() > 0) {
  796. eib.setEndTime(new DateTime(ei.getEndTime()).toString(formatterTime));
  797. eib.setElapsed((ei.getEndTime() - ei.getStartTime()) / 1000);
  798. } else {
  799. eib.setElapsed((DateTime.now().getMillis() - ei.getStartTime()) / 1000);
  800. }
  801. eib.setExecid(ei.getExecid());
  802. eib.setFlow(ei.getFlow());
  803. eib.setFlowId(ei.getFlowId());
  804. eib.setJobId(ei.getId());
  805. eib.setNestedId(ei.getNestedId());
  806. eib.setProject(ei.getProject());
  807. eib.setProjectId(ei.getProjectId());
  808. String stats = ScheduleStatus.getDescByCode(ei.getStatus());
  809. if(StringUtils.isNotBlank(stats)){
  810. eib.setStatus(stats);
  811. }
  812. eib.setSubmitTime(new DateTime(ei.getSubmitTime()).toString(formatterTime));
  813. eib.setSubmitUser(ei.getSubmitUser());
  814. eib.setType(ei.getType());
  815. eib.setUpdateTime(new DateTime(ei.getUpdateTime()).toString(formatterTime));
  816. String flowLog= fetchExecFlowLogs(excuteId, start, size);
  817. eib.setFlowLog(flowLog);
  818. List<ExecNodeBean> nodeBeanList = Lists.newArrayList();
  819. for(ExecNode node:nodes) {
  820. ExecNodeBean ebn = new ExecNodeBean();
  821. ebn.setAttempt(node.getAttempt());
  822. ebn.setJobId(node.getId());
  823. ebn.setDependencies(node.getIn());
  824. ebn.setNestedId(node.getNestedId());
  825. ebn.setStartTime(new DateTime(ei.getStartTime()).toString(formatterTime));
  826. if (node.getEndTime() > 0) {
  827. ebn.setEndTime(new DateTime(node.getEndTime()).toString(formatterTime));
  828. ebn.setElapsed((node.getEndTime() - node.getStartTime()) / 1000);
  829. } else {
  830. ebn.setElapsed((DateTime.now().getMillis() - node.getStartTime()) / 1000);
  831. }
  832. String stats2 = ScheduleStatus.getDescByCode(node.getStatus());
  833. if(StringUtils.isNotBlank(stats2)){
  834. ebn.setStatus(stats2);
  835. }
  836. ebn.setType(node.getType());
  837. ebn.setUpdateTime(new DateTime(ei.getUpdateTime()).toString(formatterTime));
  838. String logs = fetchExecJobLogs(excuteId,ebn.getJobId(),start, size);
  839. ebn.setLogs(logs);
  840. nodeBeanList.add(ebn);
  841. }
  842. eib.setNodes(nodeBeanList);
  843. return eib;
  844. }
  845. /**
  846. * 获取一个项目的projectId
  847. * @param projectName
  848. * @return
  849. * @throws Exception
  850. */
  851. public String fetchProjectId(String projectName) throws Exception {
  852. SSLUtil.turnOffSslChecking();
  853. HttpHeaders hs = new HttpHeaders();
  854. hs.add("Content-Type", CONTENT_TYPE);
  855. hs.add("X-Requested-With", X_REQUESTED_WITH);
  856. hs.add("Accept", "text/plain;charset=utf-8");
  857. Map<String, String> map = new HashMap<>();
  858. map.put("id", login());
  859. map.put("project", projectName);
  860. ResponseEntity<String> exchange = restTemplate.exchange(
  861. azkabanConfig.getAzkUrl() + "/manager?&session.id={id}&project={project}&ajax=fetchprojectflows", HttpMethod.GET,
  862. new HttpEntity<String>(hs), String.class, map);
  863. if (exchange == null||HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
  864. logger.error("Azkaban获取一个项目的所有流信息失败:" + projectName);
  865. return null;
  866. }
  867. JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("projectId");
  868. if (obj == null) {
  869. logger.error("Azkaban获取一个项目的所有流信息失败:{}:{}", projectName);
  870. return null;
  871. }
  872. String projectId = obj.getAsString();
  873. if(StringUtils.isBlank(projectId)){
  874. logger.error("获取Azkaban projectId 异常");
  875. }
  876. return projectId;
  877. }
  878. /**
  879. * 获取
  880. * @param projectName
  881. * @return
  882. * @throws Exception
  883. */
  884. public String getLastScheduleStatus(String projectName) throws Exception {
  885. List<String> flows = fetchFlowsProject(projectName);
  886. if(CollectionUtils.isNotEmpty(flows)) {
  887. for (String flow : flows) {
  888. FlowExecution fe = fetchFlowExecutions(projectName, flow, "0", "1000");
  889. if (fe == null) {
  890. continue;
  891. }
  892. List<Execution> executions = fe.getExecutions();
  893. if (executions == null || executions.size() == 0) {
  894. continue;
  895. }
  896. String status = executions.get(0).getStatus();
  897. return status;
  898. }
  899. }
  900. return null;
  901. }
  902. }
AzkabanService

  8.调用demo

  1. /**
  2. * 根据flowId 立即执行任务
  3. * @param projectName 项目名称
  4. * @param flow 流id
  5. * @throws CommonException 阿兹卡班异常
  6. */
  7. private void excuteFlowImmediately(String projectName, String flow) throws CommonException {
  8. try {
  9. azkabanService.executeFlow(projectName,flow);
  10. } catch (Exception e) {
  11. throw new CommonException("调度初始化完毕,立即执行任务异常",e);
  12. }
  13. }
demo

 

四、注意

  1.如遇报错情况,请关注azkaban相关log日志。

  2.如果pom文件jar包不全 请评论。

 

 

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号