Project

General

Profile

« Previous | Next » 

Revision 36940

Added by Marek Horst about 9 years ago

merging trunk changes with IIS-CDH-5.3.0 branch

View differences:

modules/icm-iis-common/branches/IIS-CDH-5.3.0/src/main/java/eu/dnetlib/iis/common/utils/EmptyDatastoreVerifierProcess.java
8 8
import java.util.Map;
9 9
import java.util.Properties;
10 10

  
11
import org.apache.hadoop.conf.Configuration;
11 12
import org.apache.hadoop.fs.FileSystem;
12 13

  
13
import eu.dnetlib.iis.core.java.HadoopContext;
14 14
import eu.dnetlib.iis.core.java.PortBindings;
15 15
import eu.dnetlib.iis.core.java.Ports;
16 16
import eu.dnetlib.iis.core.java.Process;
......
67 67
	}
68 68

  
69 69
	@Override
70
	public void run(PortBindings portBindings, HadoopContext context,
70
	public void run(PortBindings portBindings, Configuration conf,
71 71
			Map<String, String> parameters) throws Exception {
72 72
		CloseableIterator<?> closeableIt = DataStore.getReader(
73
				new FileSystemPath(FileSystem.get(context.getConfiguration()), 
73
				new FileSystemPath(FileSystem.get(conf), 
74 74
						portBindings.getInput().get(INPUT_PORT_NAME)));
75 75
		try {
76 76
			File file = new File(System.getProperty(OOZIE_ACTION_OUTPUT_FILENAME));
modules/icm-iis-common/branches/IIS-CDH-5.3.0/src/main/java/eu/dnetlib/iis/common/cache/CacheMetadataManagingProcess.java
10 10
import java.util.Map;
11 11
import java.util.Properties;
12 12

  
13
import org.apache.hadoop.conf.Configuration;
13 14
import org.apache.hadoop.fs.FSDataInputStream;
14 15
import org.apache.hadoop.fs.FSDataOutputStream;
15 16
import org.apache.hadoop.fs.FileSystem;
......
19 20
import com.google.gson.stream.JsonWriter;
20 21

  
21 22
import eu.dnetlib.iis.common.FsShellPermissions;
22
import eu.dnetlib.iis.core.java.HadoopContext;
23 23
import eu.dnetlib.iis.core.java.PortBindings;
24 24
import eu.dnetlib.iis.core.java.porttype.PortType;
25 25

  
......
79 79
		return Collections.emptyMap();
80 80
	}
81 81

  
82
	protected String getExistingCacheId(HadoopContext context, Map<String, String> parameters) throws IOException {
82
	protected String getExistingCacheId(Configuration conf, Map<String, String> parameters) throws IOException {
83 83
		if (parameters.containsKey(PARAM_CACHE_DIR)) {
84
			FileSystem fs = FileSystem.get(context.getConfiguration());
84
			FileSystem fs = FileSystem.get(conf);
85 85
			Path cacheFilePath = new Path(parameters.get(PARAM_CACHE_DIR), 
86 86
					DEFAULT_METAFILE_NAME);
87 87
			if (fs.exists(cacheFilePath)) {
......
106 106
		}
107 107
	}
108 108
	
109
	protected String generateNewCacheId(HadoopContext context, Map<String, String> parameters) throws IOException {
109
	protected String generateNewCacheId(Configuration conf, Map<String, String> parameters) throws IOException {
110 110
		if (parameters.containsKey(PARAM_CACHE_DIR)) {
111
			FileSystem fs = FileSystem.get(context.getConfiguration());
111
			FileSystem fs = FileSystem.get(conf);
112 112
			Path cacheFilePath = new Path(parameters.get(PARAM_CACHE_DIR), 
113 113
					DEFAULT_METAFILE_NAME);
114 114
			CacheMeta cachedMeta = null;
......
137 137
		}
138 138
	}
139 139
	
140
	protected void writeCacheId(HadoopContext context, Map<String, String> parameters) throws IOException {
140
	protected void writeCacheId(Configuration conf, Map<String, String> parameters) throws IOException {
141 141
		if (parameters.containsKey(PARAM_CACHE_DIR)) {
142 142
			if (parameters.containsKey(PARAM_ID)) {
143 143
				String cacheId = parameters.get(PARAM_ID);
144
				FileSystem fs = FileSystem.get(context.getConfiguration());
144
				FileSystem fs = FileSystem.get(conf);
145 145
				Path cacheFilePath = new Path(parameters.get(PARAM_CACHE_DIR), 
146 146
						DEFAULT_METAFILE_NAME);
147 147
				CacheMeta cachedMeta = null;
......
174 174
					outputStream.close();
175 175
//					changing file permission to +rw to allow writing for different users
176 176
					FsShellPermissions.changePermissions(fs, 
177
							context.getConfiguration(), 
177
							conf, 
178 178
							FsShellPermissions.Op.CHMOD, 
179 179
							false, "0666", cacheFilePath.toString());
180 180
				}
......
209 209
	}
210 210
	
211 211
	@Override
212
	public void run(PortBindings portBindings, HadoopContext context,
212
	public void run(PortBindings portBindings, Configuration conf,
213 213
			Map<String, String> parameters) throws Exception {
214 214
		String mode = parameters.get(PARAM_MODE);
215 215
		File file = new File(System.getProperty(OOZIE_ACTION_OUTPUT_FILENAME));
216 216
        Properties props = new Properties();
217 217
		if (MODE_READ_CURRENT_ID.equals(mode)) {
218 218
			props.setProperty(OUTPUT_PROPERTY_CACHE_ID, 
219
	        		getExistingCacheId(context, parameters));
219
	        		getExistingCacheId(conf, parameters));
220 220
		} else if (MODE_GENERATE_NEW_ID.equals(mode)) {
221 221
			props.setProperty(OUTPUT_PROPERTY_CACHE_ID, 
222
	        		generateNewCacheId(context, parameters));
222
	        		generateNewCacheId(conf, parameters));
223 223
		} else if (MODE_WRITE_ID.equals(mode)) {
224
			writeCacheId(context, parameters);
224
			writeCacheId(conf, parameters);
225 225
		} else {
226 226
			throw new RuntimeException("unsupported mode: " + mode);	
227 227
		}
modules/icm-iis-common/branches/IIS-CDH-5.3.0/src/main/java/eu/dnetlib/iis/common/oozie/property/ConditionalPropertySetter.java
7 7
import java.util.Map;
8 8
import java.util.Properties;
9 9

  
10
import eu.dnetlib.iis.core.java.HadoopContext;
10
import org.apache.hadoop.conf.Configuration;
11

  
11 12
import eu.dnetlib.iis.core.java.PortBindings;
12 13
import eu.dnetlib.iis.core.java.Process;
13 14
import eu.dnetlib.iis.core.java.porttype.PortType;
......
49 50
	}
50 51

  
51 52
	@Override
52
	public void run(PortBindings portBindings, HadoopContext context,
53
	public void run(PortBindings portBindings, Configuration conf,
53 54
			Map<String, String> parameters) throws Exception {
54 55

  
55 56
		String condition = parameters.get(PARAM_CONDITION);
modules/icm-iis-common/branches/IIS-CDH-5.3.0/src/main/java/eu/dnetlib/iis/common/oozie/property/JobPropertiesDumperProcess.java
1
package eu.dnetlib.iis.common.oozie.property;
2

  
3
import java.util.Collections;
4
import java.util.HashMap;
5
import java.util.Map;
6
import java.util.Properties;
7

  
8
import org.apache.hadoop.conf.Configuration;
9
import org.apache.hadoop.fs.FSDataOutputStream;
10
import org.apache.hadoop.fs.FileSystem;
11
import org.apache.hadoop.fs.Path;
12

  
13
import eu.dnetlib.iis.core.java.PortBindings;
14
import eu.dnetlib.iis.core.java.porttype.AnyPortType;
15
import eu.dnetlib.iis.core.java.porttype.PortType;
16

  
17
/**
18
 * Job properties dumper process. 
19
 * Writes all properties to job.properties file stored in output directory provided as input parameter.
20
 * 
21
 * @author mhorst
22
 *
23
 */
24
public class JobPropertiesDumperProcess implements eu.dnetlib.iis.core.java.Process {
25

  
26
	public static final String OUTPUT_PORT = "output";
27

  
28
	private static final Map<String, PortType> outputPorts = new HashMap<String, PortType>();
29
	
30
	{
31
		outputPorts.put(OUTPUT_PORT, new AnyPortType());
32
	}
33
	
34
	@Override
35
	public void run(PortBindings portBindings, Configuration conf,
36
			Map<String, String> parameters) throws Exception {
37
        Properties props = new Properties();
38
        for(Map.Entry<String,String> property : conf) {
39
        	props.setProperty(property.getKey(), 
40
        			property.getValue());
41
        }
42
		FileSystem fs = FileSystem.get(conf);
43
        FSDataOutputStream outputStream = fs.create(new Path(
44
        				portBindings.getOutput().get(OUTPUT_PORT),
45
        				"job.properties"), true);
46
        try {
47
        	props.store(outputStream, "");	
48
        } finally {
49
        	outputStream.close();	
50
        }	
51
	}
52

  
53
	@Override
54
	public Map<String, PortType> getInputPorts() {
55
		return Collections.emptyMap();
56
	}
57

  
58
	@Override
59
	public Map<String, PortType> getOutputPorts() {
60
		return outputPorts;
61
	}
62
	
63
}
0 64

  
modules/icm-iis-common/branches/IIS-CDH-5.3.0/src/main/java/eu/dnetlib/iis/common/oozie/property/OoziePropertiesDumperProcess.java
1
package eu.dnetlib.iis.common.oozie.property;
2

  
3
import java.util.Collections;
4
import java.util.HashMap;
5
import java.util.Map;
6
import java.util.Properties;
7

  
8
import org.apache.hadoop.conf.Configuration;
9
import org.apache.hadoop.fs.FSDataOutputStream;
10
import org.apache.hadoop.fs.FileSystem;
11
import org.apache.hadoop.fs.Path;
12

  
13
import eu.dnetlib.iis.core.java.PortBindings;
14
import eu.dnetlib.iis.core.java.porttype.AnyPortType;
15
import eu.dnetlib.iis.core.java.porttype.PortType;
16

  
17
/**
18
 * Oozie properties dumper process. 
19
 * Writes all oozie parameters to job.properties file stored in output directory provided as input parameter.
20
 * 
21
 * @author mhorst
22
 *
23
 */
24
public class OoziePropertiesDumperProcess implements eu.dnetlib.iis.core.java.Process {
25

  
26
	public static final String OUTPUT_PORT = "output";
27

  
28
	private static final Map<String, PortType> outputPorts = new HashMap<String, PortType>();
29
	
30
	{
31
		outputPorts.put(OUTPUT_PORT, new AnyPortType());
32
	}
33
	
34
	@Override
35
	public void run(PortBindings portBindings, Configuration jobConf,
36
			Map<String, String> parameters) throws Exception {
37
        
38
        Configuration actionConf = new Configuration(false);
39
        String actionConfLocation = System.getProperty("oozie.action.conf.xml");
40
        System.out.println("loading properties from: " + actionConfLocation);
41
        actionConf.addResource(new Path("file:///", actionConfLocation)); 
42
        
43
        Properties props = new Properties();
44
        for(Map.Entry<String,String> property : actionConf) {
45
        	props.setProperty(property.getKey(), 
46
        			property.getValue());
47
        }
48
		FileSystem fs = FileSystem.get(actionConf);
49
        FSDataOutputStream outputStream = fs.create(new Path(
50
        				portBindings.getOutput().get(OUTPUT_PORT),
51
        				"job.properties"), true);
52
        try {
53
        	props.store(outputStream, "");	
54
        } finally {
55
        	outputStream.close();	
56
        }	
57
	}
58

  
59
	@Override
60
	public Map<String, PortType> getInputPorts() {
61
		return Collections.emptyMap();
62
	}
63

  
64
	@Override
65
	public Map<String, PortType> getOutputPorts() {
66
		return outputPorts;
67
	}
68
	
69
}
0 70

  
modules/icm-iis-common/branches/IIS-CDH-5.3.0/src/main/java/eu/dnetlib/iis/common/lock/LockManagingProcess.java
4 4
import java.util.Map;
5 5
import java.util.concurrent.Semaphore;
6 6

  
7
import org.apache.hadoop.conf.Configuration;
7 8
import org.apache.hadoop.ha.ZKFailoverController;
8 9
import org.apache.log4j.Logger;
9 10
import org.apache.zookeeper.CreateMode;
......
13 14
import org.apache.zookeeper.ZooDefs;
14 15
import org.apache.zookeeper.ZooKeeper;
15 16

  
16
import eu.dnetlib.iis.core.java.HadoopContext;
17 17
import eu.dnetlib.iis.core.java.PortBindings;
18 18
import eu.dnetlib.iis.core.java.porttype.PortType;
19 19

  
......
57 57
	}
58 58

  
59 59
	@Override
60
	public void run(PortBindings portBindings, HadoopContext context,
60
	public void run(PortBindings portBindings, Configuration conf,
61 61
			Map<String, String> parameters) throws Exception {
62 62
		
63 63
		if (!parameters.containsKey(PARAM_NODE_ID)) {
......
67 67
			throw new Exception("lock mode not provided!");
68 68
		}
69 69

  
70
		String zkConnectionString = context.getConfiguration().get(
70
		String zkConnectionString = conf.get(
71 71
				ZKFailoverController.ZK_QUORUM_KEY);
72 72
		if (zkConnectionString==null || zkConnectionString.isEmpty()) {
73 73
			throw new Exception("zookeeper quorum is unknown, invalid " + 
modules/icm-iis-common/branches/IIS-CDH-5.3.0/src/main/resources/eu/dnetlib/iis/common/protobuf/converter/avro_to_protobuf/oozie_app/workflow.xml
20 20
        </property>
21 21
	</parameters>
22 22
              
23
    <start to="converter" />
23
    <start to="generate-schema" />
24 24

  
25
	<action name="generate-schema">
26
	    <java>
27
    		<job-tracker>${jobTracker}</job-tracker>
28
        	<name-node>${nameNode}</name-node>
29
	        <main-class>eu.dnetlib.iis.core.javamapreduce.hack.AvroSchemaGenerator</main-class>
30
	        <arg>${param_avro_input_class}</arg>
31
	        <capture-output />
32
	    </java>
33
	    <ok to="converter" />
34
	    <error to="fail" />
35
	</action>
36

  
25 37
    <action name="converter">
26 38
        <map-reduce>
27 39
            <job-tracker>${jobTracker}</job-tracker>
......
33 45
            <configuration>
34 46
                <property>
35 47
                    <name>mapreduce.inputformat.class</name>
36
                    <value>eu.dnetlib.iis.core.javamapreduce.hack.KeyInputFormat</value>
48
                    <value>org.apache.avro.mapreduce.AvroKeyInputFormat</value>
37 49
                </property>
38 50
                <property>
39 51
                    <name>mapreduce.outputformat.class</name>
......
93 105
                    <value>${param_converter_class}</value>
94 106
                </property>
95 107

  
96

  
97 108
                <!-- ## Schemas -->
98 109
                <property>
99
                    <name>eu.dnetlib.iis.avro.input.class</name>
100
                    <value>${param_avro_input_class}</value>
110
                    <name>avro.schema.input.key</name>
111
                    <value>${wf:actionData('generate-schema')[wf:conf('param_avro_input_class')]}</value>
101 112
                </property>
102 113

  
103
                <property>
104
                    <name>eu.dnetlib.iis.avro.map.output.value.class</name>
105
                    <value>org.apache.avro.Schema.Type.NULL</value>
106
                </property>
107

  
108
                <!-- ### Schema of the data produced by the reducer. -->
109
                <property>
110
                    <name>eu.dnetlib.iis.avro.output.class</name>
111
                    <value>org.apache.avro.Schema.Type.NULL</value>
112
                </property>
113

  
114 114
                <!-- ## Specification of the input and output data store -->
115 115
                <property>
116 116
                    <name>mapred.input.dir</name>
modules/icm-iis-common/branches/IIS-CDH-5.3.0/pom.xml
3 3
	<parent>
4 4
    		<groupId>eu.dnetlib</groupId>
5 5
	        <artifactId>dnet-hadoop-parent</artifactId>
6
            <version>1.0.0</version>
6
            <version>1.0.0-SNAPSHOT</version>
7 7
	</parent>
8 8
	<modelVersion>4.0.0</modelVersion>
9 9
	<artifactId>icm-iis-common</artifactId>

Also available in: Unified diff