经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云/AI » Hadoop » 查看文章
Hadoop学习(2)-java客户端操作hdfs及secondarynode作用
来源:cnblogs  作者:两千个秘密  时间:2019/7/24 8:36:47  对本文有异议

首先要在windows下解压一个windows版本的hadoop

然后在配置他的环境变量,同时要把hadoop的share目录下的hadoop下的相关jar包拷贝到esclipe

然后Build Path

下面上代码

  1. import java.io.BufferedReader;
  2. import java.io.FileInputStream;
  3. import java.io.IOException;
  4. import java.io.InputStreamReader;
  5. import java.net.URI;
  6. import java.net.URISyntaxException;
  7. import java.util.Arrays;
  8. import org.apache.hadoop.conf.Configuration;
  9. import org.apache.hadoop.fs.FSDataInputStream;
  10. import org.apache.hadoop.fs.FSDataOutputStream;
  11. import org.apache.hadoop.fs.FileStatus;
  12. import org.apache.hadoop.fs.FileSystem;
  13. import org.apache.hadoop.fs.LocatedFileStatus;
  14. import org.apache.hadoop.fs.Path;
  15. import org.apache.hadoop.fs.RemoteIterator;
  16. import org.junit.Before;
  17. import org.junit.Test;
  18. public class HdfsClientDemo {
  19. public static void main(String[] args) throws Exception {
  20. /**
  21. * Configuration参数对象的机制:
  22. * 构造时,会加载jar包中的默认配置 xx-default.xml
  23. * 再加载 用户配置xx-site.xml ,覆盖掉默认参数
  24. * 构造完成之后,还可以conf.set("p","v"),会再次覆盖用户配置文件中的参数值
  25. */
  26. // new Configuration()会从项目的classpath中加载core-default.xml hdfs-default.xml core-site.xml hdfs-site.xml等文件
  27. Configuration conf = new Configuration();
  28. // 指定本客户端上传文件到hdfs时需要保存的副本数为:2
  29. conf.set("dfs.replication", "2");
  30. // 指定本客户端上传文件到hdfs时切块的规格大小:64M
  31. conf.set("dfs.blocksize", "64m");
  32. // 构造一个访问指定HDFS系统的客户端对象: 参数1:——HDFS系统的URI,参数2:——客户端要特别指定的参数,参数3:客户端的身份(用户名)
  33. FileSystem fs = FileSystem.get(new URI("hdfs://172.31.2.38:9000/"), conf, "root");
  34. // 上传一个文件到HDFS中
  35. fs.copyFromLocalFile(new Path("D:/install-pkgs/hbase-1.2.1-bin.tar.gz"), new Path("/aaa/"));
  36. fs.close();
  37. }
  38. FileSystem fs = null;
  39. @Before
  40. public void init() throws Exception{
  41. Configuration conf = new Configuration();
  42. conf.set("dfs.replication", "2");
  43. conf.set("dfs.blocksize", "64m");
  44. fs = FileSystem.get(new URI("hdfs://172.31.2.38:9000/"), conf, "root");
  45. }
  46. /**
  47. * 从HDFS中下载文件到客户端本地磁盘
  48. * @throws IOException
  49. * @throws IllegalArgumentException
  50. */
  51. @Test
  52. public void testGet() throws IllegalArgumentException, IOException{
  53. fs.copyToLocalFile(new Path("/test"), new Path("d:/"));
  54. fs.close();
  55. }
  56. /**
  57. * 在hdfs内部移动文件\修改名称
  58. */
  59. @Test
  60. public void testRename() throws Exception{
  61. fs.rename(new Path("/install.log"), new Path("/aaa/in.log"));
  62. fs.close();
  63. }
  64. /**
  65. * 在hdfs中创建文件夹
  66. */
  67. @Test
  68. public void testMkdir() throws Exception{
  69. fs.mkdirs(new Path("/xx/yy/zz"));
  70. fs.close();
  71. }
  72. /**
  73. * 在hdfs中删除文件或文件夹
  74. */
  75. @Test
  76. public void testRm() throws Exception{
  77. fs.delete(new Path("/aaa"), true);
  78. fs.close();
  79. }
  80. /**
  81. * 查询hdfs指定目录下的文件信息
  82. */
  83. @Test
  84. public void testLs() throws Exception{
  85. // 只查询文件的信息,不返回文件夹的信息
  86. RemoteIterator<LocatedFileStatus> iter = fs.listFiles(new Path("/"), true);
  87. while(iter.hasNext()){
  88. LocatedFileStatus status = iter.next();
  89. System.out.println("文件全路径:"+status.getPath());
  90. System.out.println("块大小:"+status.getBlockSize());
  91. System.out.println("文件长度:"+status.getLen());
  92. System.out.println("副本数量:"+status.getReplication());
  93. System.out.println("块信息:"+Arrays.toString(status.getBlockLocations()));
  94. System.out.println("--------------------------------");
  95. }
  96. fs.close();
  97. }
  98. /**
  99. * 读取hdfs中的文件的内容
  100. *
  101. * @throws IOException
  102. * @throws IllegalArgumentException
  103. */
  104. @Test
  105. public void testReadData() throws IllegalArgumentException, IOException {
  106. FSDataInputStream in = fs.open(new Path("/test.txt"));
  107. BufferedReader br = new BufferedReader(new InputStreamReader(in, "utf-8"));
  108. String line = null;
  109. while ((line = br.readLine()) != null) {
  110. System.out.println(line);
  111. }
  112. br.close();
  113. in.close();
  114. fs.close();
  115. }
  116. /**
  117. * 读取hdfs中文件的指定偏移量范围的内容
  118. *
  119. *
  120. *
  121. * @throws IOException
  122. * @throws IllegalArgumentException
  123. */
  124. @Test
  125. public void testRandomReadData() throws IllegalArgumentException, IOException {
  126. FSDataInputStream in = fs.open(new Path("/xx.dat"));
  127. // 将读取的起始位置进行指定
  128. in.seek(12);
  129. // 读16个字节
  130. byte[] buf = new byte[16];
  131. in.read(buf);
  132. System.out.println(new String(buf));
  133. in.close();
  134. fs.close();
  135. }
  136. /**
  137. * 往hdfs中的文件写内容
  138. *
  139. * @throws IOException
  140. * @throws IllegalArgumentException
  141. */
  142. @Test
  143. public void testWriteData() throws IllegalArgumentException, IOException {
  144. FSDataOutputStream out = fs.create(new Path("/zz.jpg"), false);
  145. // D:\images\006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg
  146. FileInputStream in = new FileInputStream("D:/images/006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg");
  147. byte[] buf = new byte[1024];
  148. int read = 0;
  149. while ((read = in.read(buf)) != -1) {
  150. out.write(buf,0,read);
  151. }
  152. in.close();
  153. out.close();
  154. fs.close();
  155. }
  156. }

 

练习:从一个文件里面不断地采集日志上传到hdfs里面

1.流程介绍

---启动一个定时任务

    --定时探测日志原目录

    --获取文件上传到一个待上传的临时目录

    --逐一上传到hdfs目标路径,同时移动到备份目录里

--启动一个定时任务:

    --探测备份目录中的备份数据是否已经超出,如果超出就删除

 

 主类为:

  1. import java.util.Timer;
  2. public class DataCollectMain {
  3. public static void main(String[] args) {
  4. Timer timer = new Timer();
  5. //第一个为task类,第二个开始时间 第三个没隔多久执行一次
  6. timer.schedule(new CollectTask(), 0, 60*60*1000L);
  7. timer.schedule(new BackupCleanTask(), 0, 60*60*1000L);
  8. }
  9. }

CollectTask类:

这个类要继承TimerTask,重写run方法,主要内容就是不断收集日志文件

  1. package cn.edu360.hdfs.datacollect;
  2. import java.io.File;
  3. import java.io.FilenameFilter;
  4. import java.net.URI;
  5. import java.text.SimpleDateFormat;
  6. import java.util.Arrays;
  7. import java.util.Date;
  8. import java.util.Properties;
  9. import java.util.TimerTask;
  10. import java.util.UUID;
  11. import org.apache.commons.io.FileUtils;
  12. import org.apache.hadoop.conf.Configuration;
  13. import org.apache.hadoop.fs.FileSystem;
  14. import org.apache.hadoop.fs.Path;
  15. import org.apache.log4j.Logger;
  16. public class CollectTask extends TimerTask {
  17. @Override
  18. public void run() {
  19. try {
  20. // 获取配置参数
  21. Properties props = PropertyHolderLazy.getProps();
  22. // 获取本次采集时的日期
  23. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH");
  24. String day = sdf.format(new Date());
  25. File srcDir = new File("d:/logs/accesslog");
  26. // 列出日志源目录中需要采集的文件
  27. //里面传了一个文件过滤器,重写accept方法,return true就要
  28. File[] listFiles = srcDir.listFiles(new FilenameFilter() {
  29. @Override
  30. public boolean accept(File dir, String name) {
  31. if (name.startsWith("access.log")) {
  32. return true;
  33. }
  34. return false;
  35. }
  36. });
  37. // 将要采集的文件移动到待上传临时目录
  38. File toUploadDir = new File("d:/logs/toupload");
  39. for (File file : listFiles) {
  40. //这里如果是 file.renameTo(toUploadDir)是不对的,因为会生成一个toupload的文件而不是文件夹
  41. //要用renameTo的话你要自己加上文件的新名字比较麻烦
  42. //用FileUtiles是对file操作的一些工具类
  43. //第一个目标文件,第二个路径,第三个是否存在覆盖
  44. FileUtils.moveFileToDirectory(file, toUploadDir, true);
  45. }
  46. // 构造一个HDFS的客户端对象
  47. FileSystem fs = FileSystem.get(new URI("hdfs://hdp-01:9000"), new Configuration(), "root");
  48. File[] toUploadFiles = toUploadDir.listFiles();
  49. // 检查HDFS中的日期目录是否存在,如果不存在,则创建
  50. Path hdfsDestPath = new Path("/logs" + day);
  51. if (!fs.exists(hdfsDestPath)) {
  52. fs.mkdirs(hdfsDestPath);
  53. }
  54. // 检查本地的备份目录是否存在,如果不存在,则创建
  55. File backupDir = new File("d:/logs/backup" + day + "/");
  56. if (!backupDir.exists()) {
  57. backupDir.mkdirs();
  58. }
  59. for (File file : toUploadFiles) {
  60. // 传输文件到HDFS并改名access_log_
  61. fs.copyFromLocalFile(new Path(file.getAbsolutePath()), new Path("/logs"+day+"/access_log_"+UUID.randomUUID()+".log"));
  62. // 将传输完成的文件移动到备份目录
  63. //注意这里依然不要用renameTo
  64. FileUtils.moveFileToDirectory(file, backupDir, true);
  65. }
  66. } catch (Exception e) {
  67. e.printStackTrace();
  68. }
  69. }
  1. /**
  2. * 读取hdfs中的文件的内容
  3. *
  4. * @throws IOException
  5. * @throws IllegalArgumentException
  6. */
  7. @Test
  8. public void testReadData() throws IllegalArgumentException, IOException {
  9. FSDataInputStream in = fs.open(new Path("/test.txt"));
  10. BufferedReader br = new BufferedReader(new InputStreamReader(in, "utf-8"));
  11. String line = null;
  12. while ((line = br.readLine()) != null) {
  13. System.out.println(line);
  14. }
  15. br.close();
  16. in.close();
  17. fs.close();
  18. }
  19. /**
  20. * 读取hdfs中文件的指定偏移量范围的内容
  21. *
  22. *
  23. * 作业题:用本例中的知识,实现读取一个文本文件中的指定BLOCK块中的所有数据
  24. *
  25. * @throws IOException
  26. * @throws IllegalArgumentException
  27. */
  28. @Test
  29. public void testRandomReadData() throws IllegalArgumentException, IOException {
  30. FSDataInputStream in = fs.open(new Path("/xx.dat"));
  31. // 将读取的起始位置进行指定
  32. in.seek(12);
  33. // 读16个字节
  34. byte[] buf = new byte[16];
  35. in.read(buf);
  36. System.out.println(new String(buf));
  37. in.close();
  38. fs.close();
  39. }
  40. /**
  41. * 往hdfs中的文件写内容
  42. *
  43. * @throws IOException
  44. * @throws IllegalArgumentException
  45. */
  46. @Test
  47. public void testWriteData() throws IllegalArgumentException, IOException {
  48. FSDataOutputStream out = fs.create(new Path("/zz.jpg"), false);
  49. // D:\images\006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg
  50. FileInputStream in = new FileInputStream("D:/images/006l0mbogy1fhehjb6ikoj30ku0ku76b.jpg");
  51. byte[] buf = new byte[1024];
  52. int read = 0;
  53. while ((read = in.read(buf)) != -1) {
  54. out.write(buf,0,read);
  55. }
  56. in.close();
  57. out.close();
  58. fs.close();
  59. }
  1.  

 

  1. }

BackupCleanTask类

  1. package cn.edu360.hdfs.datacollect;
  2. import java.io.File;
  3. import java.text.SimpleDateFormat;
  4. import java.util.Date;
  5. import java.util.TimerTask;
  6. import org.apache.commons.io.FileUtils;
  7. public class BackupCleanTask extends TimerTask {
  8. @Override
  9. public void run() {
  10. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH");
  11. long now = new Date().getTime();
  12. try {
  13. // 探测本地备份目录
  14. File backupBaseDir = new File("d:/logs/backup/");
  15. File[] dayBackDir = backupBaseDir.listFiles();
  16. // 判断备份日期子目录是否已超24小时
  17. for (File dir : dayBackDir) {
  18. long time = sdf.parse(dir.getName()).getTime();
  19. if(now-time>24*60*60*1000L){
  20. FileUtils.deleteDirectory(dir);
  21. }
  22. }
  23. } catch (Exception e) {
  24. e.printStackTrace();
  25. }
  26. }
  27. }

 

hdfs中namenode中储存元数据(对数据的描述信息)是在内存中以树的形式储存的,并且每隔一段时间都会把这些元数据序列化到磁盘中。序列化的东西在磁盘中叫 fsimage文件。

元数据可能会很大很大,所以只能是定期的序列化

问题1:序列化的时候,发生了元数据的修改怎么办

答:namenode会把每次用户的操作都记录下来,记录成日志文件,存在edits日志文件中

其中edits日志文件也会像log4j滚动日志文件一样,当文件太大的时候会另起一个文件并改名字

问题2:当edits文件太多的时候,一次宕机也会花大量的时间从edits里恢复,怎么办

答:会定期吧edits文件重放fsimage文件,并记录edits的编号,把那些重放过的日志文件给删除。这样也相当于重新序列化了,

所以namenode并不会做这样的事情,是由secondary node做的,他会定期吧namenode的fsimage文件和edits文件下载下来

并把fsimage文件反序列化,并且读日志文件更新元数据,然后序列化到磁盘,然后把他上传给namenode。

这个机制叫做checkpoint机制

这里secondarynode 相当一一个小秘书

 

 

 

 

 

 额外知识点

注意,在windows里面不要写有些路径不要写绝对路径,因为程序放到linux下面可能会找不到,因此报错

 一般使用class加载器,这样当这个class加载的时候就会知道这个class在哪

类加载器的一些使用例子

比如我加载一个配置文件,为了避免出现绝对路径,我们可以是用类加载器

  1.      Properties props = new Properties();
  2. //加载配置文件,这样写的目的是为了避免在windows里出现绝对路径,用类加载器,再把文件传化成流
  3. props.load(HdfsWordcount.class.getClassLoader().getResourceAsStream("job.properties"));

 

而对于一些功能性的类,我们最好在写逻辑的时候也不要直接去导入这个包,而是使用Class.forName

  1. //这样不直接导入这个包,直接用类加载器,是面向接口编程的一种思想。这里我并不是在开始import xxxx.Mapper,这里Mapper是一个接口,这里我用了多态
  2. Class<?> mapper_class = Class.forName(props.getProperty("MAPPER_CLASS"));
  3. Mapper mapper = (Mapper) mapper_class.newInstance();

 

单例模式

https://www.cnblogs.com/crazy-wang-android/p/9054771.html

只有个一实例,必须自己创建自己这个实例,必须为别人提供这个实例

 

 

 饿汉式单例:就算没有人调用这个class,他也会加载进去;

如对于一个配置文件的加载

  1. import java.util.Properties;
  2. /**
  3. * 单例设计模式,方式一: 饿汉式单例
  4. *
  5. */
  6. public class PropertyHolderHungery {
  7. private static Properties prop = new Properties();
  8. static {
  9. try {
  10. //将一个文件prop.load(stram)
  11. //这里面如果传一个IO流不好,因为要用到绝对路径,使用了类加载器 这种不管有没有使用这个类都会加载
  12. prop.load(PropertyHolderHungery.class.getClassLoader().getResourceAsStream("collect.properties"));
  13. } catch (Exception e) {
  14. }
  15. }
  16. public static Properties getProps() throws Exception {
  17. return prop;
  18. }
  19. }

 懒汉式:只有调用的时候才会有,但会有线程安全问题

  1. /**
  2. * 单例模式:懒汉式——考虑了线程安全
  3. * */
  4.  
  5. public class PropertyHolderLazy {
  6. private static Properties prop = null;
  7. public static Properties getProps() throws Exception {
  8. if (prop == null) {
  9. synchronized (PropertyHolderLazy.class) {
  10. if (prop == null) {
  11. prop = new Properties();
  12. prop.load(PropertyHolderLazy.class.getClassLoader().getResourceAsStream("collect.properties"));
  13. }
  14. }
  15. }
  16. return prop;
  17. }
  18. }

 

原文链接:http://www.cnblogs.com/wpbing/p/11233399.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号