博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop中一些Java Api操作(23)
阅读量:6455 次
发布时间:2019-06-23

本文共 10447 字,大约阅读时间需要 34 分钟。

hot3.png

Hadoop基本文件系统操作:

    1.首先从本地文件系统将一个文件复制到HDFS:
        hadoop fs-copyFromLocal input/docs/quangle.txthdfs://localhost/user/tom/quangle.txt
        把文件复制回本地文件系统,并检查是否一致:
            hadoop fs -copyToLocal quangle.txt quangle.copy.txt
            md5 input/docs/quangle.txt quangle.copy.txt
            输出:
                MD5(input/docs/quangle.txt)=a16f231da6b05e2ba7a339320e7dacd9
                MD5(quangle.copy.txt)=a16f231da6b05e2ba7a339320e7dacd9
            由于MD5键值相同,表明这个文件在HDFS之旅中得以幸存并保存完整.
    2.从HadoopURL中读取数据:
        InputStreamin=null;
        try{
            in=new URL("hdfs://host/path").openStream();
        }finally{
            IOUtils.closeStream(in);
        }
        通过URLStreamHandler实现以标准输出方式显示Hadoop文件系统的文件
        publicclassURLCat(){
            static{
                URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
            }
        }
        注:从0.21.0版本开始,加入了一个名为FileContext的文件系统接口,该接口能够更好地处理多文件系统问题,并且该接口更简明、一致.
        public static void main(String[] args)throws Exception{
        InputStreamin = null;
            try{
                in = new URL(args[0].openStream());
                IOUtils.copyBytes(in,System.out,4096,false);
            }finally{
                IOUtils.closeStream(in);
            }
        }
        调用Hadoop中简洁的IOUtils类,并在finally字句夅关闭数据流,同时也可以在输入流和输出流之间复制数据(System.out),copyBytes方法的最后两个参数,第一个用于设置复制的缓冲区大小,第二个用户设置复制结果后是否关闭数据流.这里选择自行关闭输入流,因而System.out不关闭输入流.
        运行示例:
            hadoop URLCat hdfs://localhost/user/tom/quangle.txt
    3.通过FileSystem API读取数据
        FileSystem是一个通用的文件系统API,所以第一步是检索我们需要使用的文件系统实例,这里是HDFS.
        获取FileSystem实例有两种静态工厂方法:
            public static FileSystem get(Configurationconf)throws IOException
            public static FileSystem get(URIuri,Configurationconf)throws IOException
        Configuration对象封装了客户端或服务器的配置,通过设置配置文件读取类路径来实现(如conf/core-site.xml).
        第一个方法返回的是默认文件系统(在conf/core-site.xml中指定的,如果没有指定,则使用默认的本地文件系统).
        第二个方法通过给定的URI方案和权限来确定要使用的文件系统,如果给定URI中没有指定方案,则返回默认文件系统.
        有了FileSystem实例之后,我们调用open()函数来获取文件的输入流:
            public FSDataInputStream open(Pathf)throws IOException
            public abstract FSDataInputStream open(Pathf,intbufferSize)throws IOException
        第一个方法使用默认的的缓冲区大小4kb.
        直接使用FileSystem以标准输出格式显示Hadoop文件系统中的文件
        public class FileSystemCat(){
            public static void main(String[] args)throws Exception{
                String uri = args[0];
                Configuration conf= new Configuration();
                FileSystem fs = FileSystem.get(URI.create(uri),conf);
                InputStream in = null;
                try{
                    in = fs.open(new Path(uri));
                    IOUtils.copyBytes(in,System.out.4096,false);
                }finally{
                    IOUtils.closeStream(in);
                }
            }
        }
        程序运行结果如下:
            hadoop FileSystemCat hdfs://localhost/user/tom/quagnle.txt
    4.写入数据
        FileSystem类有一系列创建文件的方法.最简单的方法是给准备创建的文件制定一个Path对象.然后返回一个用于写入数据的输出流:
        public FSDataOutputStream create(Path f)throws IOException
        注意:create()方法能够为需要写入且当前不存在的文件创建父目录.尽管这样很方便,但有时并不希望这样.如果你希望不存在父目录就发生文件写入失败,则应该先调用exists()方法检查父目录是否存在.
        还有一个重载方法Propressable,用于传递回调接口,如此一来,可以把数据写入数据节点的进度通知到你的应用:
            public interface Progressable{
                public void progress();
            }
        另外一种新建文件的方法,是使用append()方法在一个一有文件末尾追加数据(还存在一些其他重载版本):
            public FSDataOutputStream append(Path f)throws IOException
        该追加操作允许一个writer打开文件后访问该文件的最后偏移量处追加数据.有了这个API,某些应用可以创建无边界文件.
        例如:
            日志文件可以在机器重启后在已有文件后面继续追加数据.该追加操作是可选的,并非所有Hadoop文件系统都实现了该操作.
            例如,HDFS支持追加,但S3文件系统就不支持.
    5.将本地文件复制到Hadoop文件系统:
        public class FileCopyWithProgress{
            public static void main(String[] args){
                String localSrc = args[0];
                String dst = args[1];
                InputStream in = new BufferInputStream(newFileInputStream(localSrc));
                Configuration conf = new Configuration();
                FileSystem fs = FileSystem.get(URI.create(dst),conf);
                OutputStream out = fs.create(new Path(dst),newProgressable(){
                    public void progress(){
                        System.out.print(".");
                    }
                });
                IOUtils.copyBytes(in,out,4096,true);
            }
        }
    6.FSDataInputStream
        public class FSdataInputStream extends DataInputStream implements Seekable,PositionedReadable{}
        Seekable接口支持在文件中找到指定位置,并提供一个查询当前位置相对于文件起始位置偏移量(getPos())的查询方法:
            public interface Seekable{
                void seek(long pos)throws IOException;
                long getPos()throws IOException;
                boolean seekToNewSource(long targetPos)throws IOException;
            }
        使用seek方法,将Hadoop文件系统中的一个文件标准输出上显示两次
            public class FileSystem DoubleCat{
                public static void main(String[] args)throws Exception{
                    String uri =args[0];
                    Configuration conf = new Configuration();
                    FileSystem fs = FileSystem.get(URI.create(uri),conf);
                    FSDataInputStream in=null;
                    try{
                        in = fs.open(new Path(uri));
                        IOUtils.copyByste(in,System.out,4096,false);
                        in.seek(0);
                        IOUtils.copyBytes(in,System.out,4096,false);
                    }finally{
                        IOUtils.closeStream(in);
                    }
                }
            }
        在一个小文件上运行的结果如下:
            hadoop FileSystemDoubleCat hdfs://localhost/user/tom/quangle.txt
            FSDataInputStream类也实现了PositionedReadable接口,从一个指定偏移量处理读取文件的一部分:
            public interface PositionedReadable{
                public int read(long position,byte[] buffer,int offset,int length)throws IOException;
                public void readFully(long position,byte[] buffer,int offset,int length)throws IOException;
                public void readFully(long position,byte[] buffer)throws IOException;
            }
            read()方法从文件的指定position处读取至多为length字节的数据并存入缓冲区buffer的指定偏离量offset处.返回值是实际读到的字节数:调用者需要检查这个值,它有可能小于指定的length长度.
            readFully()方法将指定length长度的字节数数据读取到buffer中(或在只接受buffer字节数组的版本中,读取buffer.length长度字节数据),除非已经读到文件末尾,这种情况下将抛出EOFException异常.
            所有这些方法会保留文件当前偏移量,并且是线程安全的,因此它们提供了再读取文件-----可能是元数据----的主题时访问文件的其他部分的便利方法.事实上,这只是按照以下模式实现的Seekable接口:
                lon goldPos = getPos();
                try{
                    seek(position);
                }finally{
                    seek(oldPos)
                }
            seek()方法是一个相对高开销的操作,需要慎重使用.建议用流数据来构建应用的访问模式(如使用MapReduce),而非执行大量的seek()方法.
    7.FSDataOutputStream对象
        FileSystem实例的create()方法返回FSDataOutputStream对象,与FSDataInputSream类相似,它也有一个检查文件当前位置的方法:
        public class FSDataOutputStream extends DataOutputStream implements Syncable{
            public long getPos()throwsIOException{
                //
            }
        }
        但与FSDataInputStream类不同的是,FSDataOutputStream类不允许在文件中定位.这是因为HDFS只允许对一个已打开的文件顺序写入,或在现有文件的末尾追加数据.它不支持在除文件末尾之外的其他位置进行写入,因此,写入时定位就没有什么意义.
    8.FileSystem实例提供了创建目录的方法:
        public boolean mkdirs(Path f)throws IOException
        这个方法可以一次性创建所有必要但还没有的父目录,就像java.io.File类的mkdirs()方法.如果目录都已经创建成功,则返回true.
        通常,你不需要显式创建一个目录,因为调用create()方法写入文件时会自动创建父目录.
Hadoop查询文件系统:
    1.文件元数据:FileStatus
        任何文件系统的一个重要特征都是提供其目录结构浏览和检索它所存文件盒目录相关信息的功能.
        FileStatus类封装了文件系统中文件和目录的元数据,包括文件长度,块大小,备份,修改时间,所有者以及权限信息.
        FileSystem的getFileStatus()方法用于获取文件或目录的FileStatus对象.
        展示文件状态信息:
        public class ShowFileStatusTest{
            private MiniDFSCluster cluster;
            private FileSystem fs;
            @Before
            public void setUp()throws IOException{
                Configuration conf = new Configuration();
                if(System.getProperty("text.build.data")==null){
                    System.setProperty("test.build,data","/tmp");
                }
                cluster=newmMiniDFSCluster(conf,1,true,null);
                fs=cluster.getFileSystem();
                OutputStream out=fs.create(newPath("/dir/file"));
                out.write("content".getBytes("UTF-8"));
                out.close();
            }
            
            publicvoidtearDown(()throwsIOException{
                if(fs!=null){fs.close();}
                if(cluster!=null){cluster.shutdown();}
            }    
            @Test(expected=FileNotFoundException.class)
            public void throwsFileNotFoundNonExistentFile()throwsIOException{
                fs.getFileStatus(newPath("no-such-file"));
            }
            
            public void fileStatusForFile()throws IOException{
                Path file = new Path("/dir/file");
                FileStatus stat = fs.getFileStatus(file);
                assertThat(stat.getPath().toUri().getPath(),is("/dir/file"));
                assertThat(stat.isDir(),is(false));
                assertThat(stat.getLen(),is(7L));
                assertThat(stat.getModificationTime(),is(lessThanOrEqualTO(System.currentTimeMillis())));
                assertThat(stat.getReplication(),is(short)1);
                assertThat(stat.getBlockSize(),is(64*1024*1024L));
                assertThat(stat.getOwner(),is("tom"));
                assertThat(stat.getGroup(),is("supergroup"));
                assertThat(stat.getPermission().toString(),is("rw-r--r--"));
            }
            
            public void fileStatusForDirectory()throws IOException{
                Path dir = new Path("/dir")
                FileStatus stat = fs.getFileStatus(dir);
                assertThat(stat.getPath().toUri().getPath(),is("/dir"));
                assertThat(stat.isDir(),is(true));//是否是目录
                assertThat(stat.getLen(),is(0L));//文件长度
                assertThat(stat.getModificationTime(),is(lessThanOrEqualTO(System.currentTimeMillis())));//最后修改时间
                assertThat(stat.getReplication(),is(short)0);//备份数
                assertThat(stat.getBlockSize(),is(0L));//块大小
                assertThat(stat.getOwner(),is("tom"));//用户
                assertThat(stat.getGroup(),is("supergroup"));//组
                assertThat(stat.getPermission().toString(),is("rwxr-xrr-x"));//权限
            }
        }
        如果文件或目录均不存在,则会抛出FileNotFoundException异常.但是,如果只需检查文件或目录是否存在,那么调用exists()方法会更方便:
        public boolean exists(Path f)throws IOException
        
    2.列出文件:
        查找一个文件或目录的信息很实用,但通常你还需要能够列出目录的内容.这就是FileSystem的listStatus()方法的功能:
            public FileStatus[] listStatus(Path f)throws IOException
            public FileStatus[] listStatus(Path f,PathFilter fileter)throws IOException
            public FileStatus[] listStatus(Path[] files)throws IOException
            public FileStatus[] listStatus(Path[] files,PathFilter fileter)throws IOException
        当传入的参数是一个文件时,它会简单转变成以数组方法返回长度为1的FileStatus对象.
        当传入参数是一个目录时,则返回0或多个FileStatus对象,表示此目录中包含的文件和目录.
        注意FileUtil中stat2Paths()方法的使用,它将一个FileStatus对象数组转换为Path对象数组.
        例:显示Hadoop文件系统中一组路径的文件信息
        public class ListStatus{
            public static void main(String[]args){
                String uri = args[0];
                Configuration conf = new Configuration();
                FileSystem fs = FileSystem.get(URI.create(uri),conf);
                Path[] paths = new Path[args.length];
                for(int i=0;i<paths.length;i++){
                    paths[i] = new Path(args[i]);
                }
                FileStatus[] status = fs.listStatus(paths);
                Path[] listedPaths = FileUtil.stat2Paths(status);
                for(Path p:listedPaths){
                    System.out.println(p);
                }
            }
        }
    3.文件模式
        在单个操作中处理一批文件,这是一个常见要求.举例来说,处理日志的MapReduce作业可能需要分析一个月内包含在大量目录中的日志文件.在一个表达式中使用通配符来匹配多个文件时比较方便的,无需列举每个文件盒目录来指定输入,该操作称为"通配".Hadoop为执行通配提供了两个FileSystem方法:
            public FileStatus[] globStatus(Path pathPattern)throws IOException
            public FileStatus[] globStatus(Path pathPattern,Path Filterfilter)throws IOException
            globStatus()方法返回与路径相匹配的所有文件的FileStatus对象数组,并按路径排序.PathFilter命令作为可选项可以进一步对匹配限制.
            Hadoop支持的通配符与Unixbash相同.
            通配符            名称            匹配
            *                星号            匹配0或多个字符
            ?                问号            匹配单一个字符
            [ab]            字符类            匹配{a,b}集合中的一个字符
            [^ab]            非字符类        匹配非{a,b}集合中的一个字符
            [a-b]            字符范围    匹配一个在{a,b}范围内的字符(包括ab),a在字典顺序上要小于或等于b
            [^a-b]            非字符范围        匹配一个不在{a,b}范围内的字符(包括ab),a在字典顺序上要小于等于b
            {a,b}            或选择            匹配包含a或b中的一个的表达式
            \c                转义字符        匹配元字符c
    4.PathFilter对象            通配符模式并不总能够精确地描述我们想要访问的文件集.比如,使用通配格式排除一个特定的文件就不太可能,FileSystem中的listStatus()和globStatus()方法提供了可选的PathFilter对象,使我们能够通过编程方式控制通配符:
        public interface PathFilter{
            boolean accept(Path path);
        }
    PathFilter与java.io.FileFilter一样,是Path对象而不是File对象.
    例子:用于排序匹配正则表达式路径的PathFilter
        public class RegexExcludePathFilter implements PathFilter{
            private final String regex;
            public RegexExcludePathFilter(String regex){
                this.regex = regex;
            }
            public boolean accept(Path path){
                return !path.toString().matchers(regex);
            }
        }
    这个过滤器只传递不匹配正则表达式的文件.我们将该过滤器与预先去除文件的通配符想结合:过滤器可优化结果.
    fs.globStatus(newPath("/2007/*/*"),newRegexExcludeFilter("^.*/2007/12/31$"));
    过滤器由Path表示,只能作用于文件名.不能针对文件的属性来构建过滤器.但是,通配符模式和正则表达式同样无法对文件属性进行匹配.例如,如果你将文件存储在按照日期排列的目录结构中,则可以根据Pathfilter在给定的时间范围内选出文件.
    4.删除数据
        使用FileSystem的delete()方法可以永久性删除文件或目录.
        public boolean delete(Path f,boolean recursive)throwsIOException
        如果f是一个文件或空目录,那么recursive的值就会被忽略.只有在recrusive值为true时,一个非空目录及其内容才回被删除(否则会抛出IOException异常)

                                                                                                                                Name:Xr

                                                                                                                                Date:2014-03-12 21:54

转载于:https://my.oschina.net/Xiao629/blog/207695

你可能感兴趣的文章
IEnumerable<T>
查看>>
IntelliJ IDEA 注册码
查看>>
linux 上面配置apache2的虚拟目录
查看>>
Linux学习总结 (未完待续...)
查看>>
NoSQL数据库探讨 - 为什么要用非关系数据库?
查看>>
String字符串的截取
查看>>
switch函数——Gevent源码分析
查看>>
Spring MVC简单原理
查看>>
DynamoDB Local for Desktop Development
查看>>
ANDROID的SENSOR相关信息
查看>>
laravel 使用QQ邮箱发送邮件
查看>>
用javascript验证哥德巴赫猜想
查看>>
Shell编程-环境变量配置文件
查看>>
(转)CSS浮动(float,clear)通俗讲解
查看>>
os.walk函数
查看>>
[Unity3d]DrawCall优化手记
查看>>
SQL Serever学习7——数据表2
查看>>
(转)Mac 下设置android NDK的环境
查看>>
dubbo问题总结
查看>>
Struts2和Spring MVC的区别
查看>>