哎,最近公司又让我做flex的东西。也没什么时间搞Hadoop了。这里先把这段时间的东西做一个简单的总结。
思路:把本地的类编译打包->上传到Hadoop集群环境->运行。
上述属于废话,网上的相关内容很多。
Hadoop配置方面的就不多说了,baidu一下你就知道。
为了达到远程开发的目的,之前在配置过程中犯了很多错误。主要是masters里的问题,还有我的ubuntu的hosts文件总出问题。
另外,又简单看了看hadoop源码,唯一的感觉就是到处都是配置项。
这里还是主要是贴代码吧。
实现:还以WordCount为例。
代码如下:
WordCount.java
package org.yangzc.hadoop.demo;
import java.io.File;
import java.io.IOException;
import java.util.StringTokenizer;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.MapFile.Reader;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.yangzc.hadoop.tools.FileTools;
import org.yangzc.hadoop.tools.JarTools;
import org.yangzc.hadoop.tools.JobTools;
/**
* 测试用例
* @author yangzc
*
*/
public class WordCount {
public static class TokenizerMapper extends
Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
FileTools.copyToRemote(conf, "C:/Documents and Settings/Administrator/桌面/test.txt", "hdfs://ubuntu:9000/usr/devsoft/tmp/ss1.txt");
// FileTools.copyToLocal(conf, "C:/Documents and Settings/Administrator/桌面/indexhaha.txt", "hdfs://ubuntu:9000/usr/devsoft/tmp/e8e035dcd123404fa8cf8f132fa37e4a");
JobTools.addClassPath("D:/workspace/Hadoop/conf");
Thread.currentThread().setContextClassLoader(JobTools.getClassLoader());
File jarpath = JarTools.makeJar("bin");
conf.set("mapred.jar", jarpath.toString());
Job job = new Job(conf, "word count");
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(
"hdfs://ubuntu:9000/usr/devsoft/tmp/ss1.txt"));
Path destpath = new Path(
"hdfs://ubuntu:9000/usr/devsoft/tmp/" + UUID.randomUUID().toString().replace("-", ""));
FileOutputFormat.setOutputPath(job, destpath);
Path rtnpath = FileOutputFormat.getOutputPath(job);
System.out.println(rtnpath);
job.waitForCompletion(false);
// FileTools.copyDirToLocal(conf, "C:/Documents and Settings/Administrator/桌面", destpath.toString());
Reader reader = new Reader(FileSystem.get(conf),destpath.toString()+"/part-r-00000", conf);
Writable wa = reader.get(new Text("hao"), new IntWritable(1));
System.out.println(wa.toString());
}
}
帮助类:
FileTools.java
package org.yangzc.hadoop.tools;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class FileTools {
public static List<File> getFileList(String parent){
List<File> filelst = new ArrayList<File>();
//取得文件列表
getList(filelst, parent);
return filelst;
}
private static void getList(List<File> filelst, String parent){
File p = new File(parent);
if(!p.exists())return;
if(p.isDirectory()){
File clst[] = p.listFiles();
if(clst == null || clst.length == 0)return;
for(File f: clst){
getList(filelst, f.getAbsolutePath());
}
}else{
filelst.add(p);
}
}
public static void copyToRemote(Configuration conf, String local, String dest) throws IOException{
FileSystem fs = FileSystem.get(conf);
fs.copyFromLocalFile(new Path(local), new Path(dest));
fs.close();
}
public static void copyToLocal(Configuration conf, String local, String dest) throws IOException{
FileSystem fs = FileSystem.get(conf);
FSDataInputStream fis = fs.open(new Path(dest));
FileOutputStream fos = new FileOutputStream(local);
byte buf[] = new byte[1024];
int len = -1;
while((len = fis.read(buf, 0, 1024)) != -1){
fos.write(buf, 0, len);
}
fos.flush();
fos.close();
fis.close();
fs.close();
}
public static void copyDirToLocal(Configuration conf, String localdir, String destdir) throws IOException{
if(!new File(localdir).exists())new File(localdir).mkdirs();
FileSystem fs = FileSystem.get(conf);
FileStatus filestatus[] = fs.listStatus(new Path(destdir));
for(FileStatus f: filestatus){
Path path = f.getPath();
String local = localdir + "/" + path.getName();
try{
copyToLocal(conf, local, path.toString());
}catch(Exception e){
System.out.println("read remote file error!!!");
}
}
}
}
JarTools.java
package org.yangzc.hadoop.tools;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;
public class JarTools {
public static File makeJar(String classPath) throws Exception{
if(classPath == null || "".equals(classPath))throw new Exception("classPath不能为空!!!");
JarOutputStream jos = null;
try {
Manifest manifest = new Manifest();
manifest.getMainAttributes().putValue("Manifest-Version", "1.0");
//创建临时文件
final File jarfile = File.createTempFile("tmp_", ".jar", new File(System.getProperty("java.io.tmpdir")));
//注册关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(){
@Override
public void run() {
jarfile.deleteOnExit();
}
});
FileOutputStream fis = new FileOutputStream(jarfile);
jos = new JarOutputStream(fis, manifest);
File file = new File(classPath);
if(!file.isDirectory())return null;
List<File> files = FileTools.getFileList(classPath);
for(File f: files){
String filepath = f.getAbsolutePath().replace(new File(classPath).getAbsolutePath(), "");
if(filepath.startsWith("\\")){
filepath = filepath.substring(1);
}
JarEntry entry = new JarEntry(filepath.replace("\\", "/"));
jos.putNextEntry(entry);
FileInputStream is = new FileInputStream(f);
byte buf[] = new byte[1024];
int len = -1;
while((len = is.read(buf, 0, 1024)) != -1){
jos.write(buf, 0 , len);
}
is.close();
}
jos.flush();
return jarfile;
} catch (IOException e) {
e.printStackTrace();
} finally{
if(jos != null)jos.close();
}
return null;
}
public static void main(String[] args) {
try {
JarTools.makeJar("D:/workspace/JarMaker/bin/");
} catch (Exception e) {
e.printStackTrace();
}
}
}
JobTools.java
package org.yangzc.hadoop.tools;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.mapreduce.Job;
public class JobTools {
private static List<URL> classPaths = new ArrayList<URL>();
public static void addClassPath(String classPath){
if(classPath != null && !"".equals(classPath)){
File f = new File(classPath);
if(!f.exists())return;
try {
classPaths.add(f.getCanonicalFile().toURI().toURL());
} catch (MalformedURLException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void addJarDir(String lib){
File file = new File(lib);
if(!file.exists())return;
if(!file.isDirectory())return;
File fs[] = file.listFiles();
if(fs == null || fs.length == 0)return;
for(File f: fs){
addClassPath(f.getAbsolutePath());
}
}
public static URLClassLoader getClassLoader(){
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
if(classLoader == null)classLoader = JobTools.getClassLoader();
if(classLoader == null)classLoader = ClassLoader.getSystemClassLoader();
return new URLClassLoader(classPaths.toArray(new URL[0]), classLoader);
}
public static boolean runJob(Job job){
try {
return job.waitForCompletion(true);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
return false;
}
public static void main(String[] args) {
}
}
分享到:
相关推荐
SpringBoot集成hadoop开发环境(复杂版的WordCount)前言环境清单创建SpringBoot项目创建包创建yml添加集群主机名映射hadoop配置文件环境变量HADOOP_HOME编写代码添加hadoop依赖jar包编译项目造数据IDEA远程提交...
eclipse连接远程hadoop集群开发时权限不足问题解决方案 (2).pdfeclipse连接远程hadoop集群开发时权限不足问题解决方案 (2).pdf
Hadoop伪分布式部署文档(包括服务器伪分布式部署,本地hadoop开发环境部署,eclipse远程连接Hadoop服务器,实测无数遍,综合网上无数文档)
Hadoop 开发环境搭建第一篇: 1、是用Virtual Box 系统使用的是Redhat ,linux不熟的朋友们可以顺便学习一下linux, 2、环境配置包括防火墙关闭,IP分配,更改域名等 3、服务软件:远程服务SSH等安装
Hadoop系统安装运行与程序开发 1.单机Hadoop系统安装基本步骤 2.集群Hadoop系统安装基本步骤 3.Hadoop集群远程作业提交与执行 4.Hadoop MapReduce程序开发
本文档主要讲述的是RedHat linux下安装hadoop 0.20.2, 并在windows下远程连接此hadoop,开发调试;感兴趣的朋友可以过来看看。 Hadoop是一个由Apache基金会所开发的分布式系统基础架构。 资源太大,传百度网盘了,...
Hadoop3.3.0在windows下部署开发环境或者远程开发hadoop集群,对应的 hadoop.dll和winutils.exe以及整个bin目录 ,以及安装方法 亲测可用,有什么问题可以在评论描述
不懂运行,下载完可以私聊问,可远程教学 该资源内项目源码是个人的毕设,代码都测试ok,都是运行成功后才上传资源,答辩评审平均分达到96分,放心下载使用! <项目介绍> 1、该资源内项目代码都经过测试运行成功,...
002 配置虚拟机IP地址和如何使用远程工具SecureCRT 003 Linux 环境下基本命令使用及Linux系统中文件的类型和权限 004 Linux 环境下基本命令讲解二 005 Linux 系统远程FTP工具与桌面工具XManager使用和培养三大能力 ...
想要在windows上运行hadoop,学习大数据的小伙伴福利来了
Windows下Eclispe远程开发Mapreduce程序
本文档为自己的实践过程描述,windows+eclipse+hadoop plugin连接远程主机环境搭建
hadoop Common – 是hadoop的核心,包括文件系统、远程调用RPC的序列化函数。 HDSF : 提供高吞吐量的可靠分布式文件系统是 GFS的开源实现。 •Hadoop的文件系统。必须通过hadoop fs 命令来读取。支持分布式。 ...
基于Hadoop开发出该系统。系统分为前台和后台两部分,前台采用 JSP 和Js编写界面和前端逻辑校验,后台开发主要采用Java语言,使用spring、bootmetro-master等Java EE开发框架,同时使用 MySQL、HDFS Java API等实现...
建议配合主机的IntelliJ-IDEA的Bigdata拓展工具以及SSH服务进行远程操控使用。 3.本机密码都是“hadoop”。 4.mysql5.7用户名为"root",密码为"hadoop"。 5.使用hadoop集群前,需要根据本地虚拟机的IP地址修改/etc/...
hadoop-eclipse-plugin-2.8.2.jar hadoop eclipse环境开发调试所用
eclipse连接远程hadoop集群开发时0700问题解决方案。修改源码,重新编译后hadoop-core-1.2.0
基于Docker构建的Hadoop开发测试环境,包含Hadoop,Hive,HBase,Spark+源代码+文档说明 基于Docker的Hadoop开发测试环境使用说明 ## 0.内容 1. 基本软件环境介绍 2. 使用方法简介 3. 已知问题 4. 注意事项 ## 1....
主要介绍了windows 32位eclipse远程hadoop开发环境搭建的相关资料,需要的朋友可以参考下