Revision 36940
Added by Marek Horst about 9 years ago
modules/icm-iis-common/branches/IIS-CDH-5.3.0/src/main/java/eu/dnetlib/iis/common/utils/EmptyDatastoreVerifierProcess.java | ||
---|---|---|
8 | 8 |
import java.util.Map; |
9 | 9 |
import java.util.Properties; |
10 | 10 |
|
11 |
import org.apache.hadoop.conf.Configuration; |
|
11 | 12 |
import org.apache.hadoop.fs.FileSystem; |
12 | 13 |
|
13 |
import eu.dnetlib.iis.core.java.HadoopContext; |
|
14 | 14 |
import eu.dnetlib.iis.core.java.PortBindings; |
15 | 15 |
import eu.dnetlib.iis.core.java.Ports; |
16 | 16 |
import eu.dnetlib.iis.core.java.Process; |
... | ... | |
67 | 67 |
} |
68 | 68 |
|
69 | 69 |
@Override |
70 |
public void run(PortBindings portBindings, HadoopContext context,
|
|
70 |
public void run(PortBindings portBindings, Configuration conf,
|
|
71 | 71 |
Map<String, String> parameters) throws Exception { |
72 | 72 |
CloseableIterator<?> closeableIt = DataStore.getReader( |
73 |
new FileSystemPath(FileSystem.get(context.getConfiguration()),
|
|
73 |
new FileSystemPath(FileSystem.get(conf),
|
|
74 | 74 |
portBindings.getInput().get(INPUT_PORT_NAME))); |
75 | 75 |
try { |
76 | 76 |
File file = new File(System.getProperty(OOZIE_ACTION_OUTPUT_FILENAME)); |
modules/icm-iis-common/branches/IIS-CDH-5.3.0/src/main/java/eu/dnetlib/iis/common/cache/CacheMetadataManagingProcess.java | ||
---|---|---|
10 | 10 |
import java.util.Map; |
11 | 11 |
import java.util.Properties; |
12 | 12 |
|
13 |
import org.apache.hadoop.conf.Configuration; |
|
13 | 14 |
import org.apache.hadoop.fs.FSDataInputStream; |
14 | 15 |
import org.apache.hadoop.fs.FSDataOutputStream; |
15 | 16 |
import org.apache.hadoop.fs.FileSystem; |
... | ... | |
19 | 20 |
import com.google.gson.stream.JsonWriter; |
20 | 21 |
|
21 | 22 |
import eu.dnetlib.iis.common.FsShellPermissions; |
22 |
import eu.dnetlib.iis.core.java.HadoopContext; |
|
23 | 23 |
import eu.dnetlib.iis.core.java.PortBindings; |
24 | 24 |
import eu.dnetlib.iis.core.java.porttype.PortType; |
25 | 25 |
|
... | ... | |
79 | 79 |
return Collections.emptyMap(); |
80 | 80 |
} |
81 | 81 |
|
82 |
protected String getExistingCacheId(HadoopContext context, Map<String, String> parameters) throws IOException {
|
|
82 |
protected String getExistingCacheId(Configuration conf, Map<String, String> parameters) throws IOException {
|
|
83 | 83 |
if (parameters.containsKey(PARAM_CACHE_DIR)) { |
84 |
FileSystem fs = FileSystem.get(context.getConfiguration());
|
|
84 |
FileSystem fs = FileSystem.get(conf);
|
|
85 | 85 |
Path cacheFilePath = new Path(parameters.get(PARAM_CACHE_DIR), |
86 | 86 |
DEFAULT_METAFILE_NAME); |
87 | 87 |
if (fs.exists(cacheFilePath)) { |
... | ... | |
106 | 106 |
} |
107 | 107 |
} |
108 | 108 |
|
109 |
protected String generateNewCacheId(HadoopContext context, Map<String, String> parameters) throws IOException {
|
|
109 |
protected String generateNewCacheId(Configuration conf, Map<String, String> parameters) throws IOException {
|
|
110 | 110 |
if (parameters.containsKey(PARAM_CACHE_DIR)) { |
111 |
FileSystem fs = FileSystem.get(context.getConfiguration());
|
|
111 |
FileSystem fs = FileSystem.get(conf);
|
|
112 | 112 |
Path cacheFilePath = new Path(parameters.get(PARAM_CACHE_DIR), |
113 | 113 |
DEFAULT_METAFILE_NAME); |
114 | 114 |
CacheMeta cachedMeta = null; |
... | ... | |
137 | 137 |
} |
138 | 138 |
} |
139 | 139 |
|
140 |
protected void writeCacheId(HadoopContext context, Map<String, String> parameters) throws IOException {
|
|
140 |
protected void writeCacheId(Configuration conf, Map<String, String> parameters) throws IOException {
|
|
141 | 141 |
if (parameters.containsKey(PARAM_CACHE_DIR)) { |
142 | 142 |
if (parameters.containsKey(PARAM_ID)) { |
143 | 143 |
String cacheId = parameters.get(PARAM_ID); |
144 |
FileSystem fs = FileSystem.get(context.getConfiguration());
|
|
144 |
FileSystem fs = FileSystem.get(conf);
|
|
145 | 145 |
Path cacheFilePath = new Path(parameters.get(PARAM_CACHE_DIR), |
146 | 146 |
DEFAULT_METAFILE_NAME); |
147 | 147 |
CacheMeta cachedMeta = null; |
... | ... | |
174 | 174 |
outputStream.close(); |
175 | 175 |
// changing file permission to +rw to allow writing for different users |
176 | 176 |
FsShellPermissions.changePermissions(fs, |
177 |
context.getConfiguration(),
|
|
177 |
conf,
|
|
178 | 178 |
FsShellPermissions.Op.CHMOD, |
179 | 179 |
false, "0666", cacheFilePath.toString()); |
180 | 180 |
} |
... | ... | |
209 | 209 |
} |
210 | 210 |
|
211 | 211 |
@Override |
212 |
public void run(PortBindings portBindings, HadoopContext context,
|
|
212 |
public void run(PortBindings portBindings, Configuration conf,
|
|
213 | 213 |
Map<String, String> parameters) throws Exception { |
214 | 214 |
String mode = parameters.get(PARAM_MODE); |
215 | 215 |
File file = new File(System.getProperty(OOZIE_ACTION_OUTPUT_FILENAME)); |
216 | 216 |
Properties props = new Properties(); |
217 | 217 |
if (MODE_READ_CURRENT_ID.equals(mode)) { |
218 | 218 |
props.setProperty(OUTPUT_PROPERTY_CACHE_ID, |
219 |
getExistingCacheId(context, parameters));
|
|
219 |
getExistingCacheId(conf, parameters));
|
|
220 | 220 |
} else if (MODE_GENERATE_NEW_ID.equals(mode)) { |
221 | 221 |
props.setProperty(OUTPUT_PROPERTY_CACHE_ID, |
222 |
generateNewCacheId(context, parameters));
|
|
222 |
generateNewCacheId(conf, parameters));
|
|
223 | 223 |
} else if (MODE_WRITE_ID.equals(mode)) { |
224 |
writeCacheId(context, parameters);
|
|
224 |
writeCacheId(conf, parameters);
|
|
225 | 225 |
} else { |
226 | 226 |
throw new RuntimeException("unsupported mode: " + mode); |
227 | 227 |
} |
modules/icm-iis-common/branches/IIS-CDH-5.3.0/src/main/java/eu/dnetlib/iis/common/oozie/property/ConditionalPropertySetter.java | ||
---|---|---|
7 | 7 |
import java.util.Map; |
8 | 8 |
import java.util.Properties; |
9 | 9 |
|
10 |
import eu.dnetlib.iis.core.java.HadoopContext; |
|
10 |
import org.apache.hadoop.conf.Configuration; |
|
11 |
|
|
11 | 12 |
import eu.dnetlib.iis.core.java.PortBindings; |
12 | 13 |
import eu.dnetlib.iis.core.java.Process; |
13 | 14 |
import eu.dnetlib.iis.core.java.porttype.PortType; |
... | ... | |
49 | 50 |
} |
50 | 51 |
|
51 | 52 |
@Override |
52 |
public void run(PortBindings portBindings, HadoopContext context,
|
|
53 |
public void run(PortBindings portBindings, Configuration conf,
|
|
53 | 54 |
Map<String, String> parameters) throws Exception { |
54 | 55 |
|
55 | 56 |
String condition = parameters.get(PARAM_CONDITION); |
modules/icm-iis-common/branches/IIS-CDH-5.3.0/src/main/java/eu/dnetlib/iis/common/oozie/property/JobPropertiesDumperProcess.java | ||
---|---|---|
1 |
package eu.dnetlib.iis.common.oozie.property; |
|
2 |
|
|
3 |
import java.util.Collections; |
|
4 |
import java.util.HashMap; |
|
5 |
import java.util.Map; |
|
6 |
import java.util.Properties; |
|
7 |
|
|
8 |
import org.apache.hadoop.conf.Configuration; |
|
9 |
import org.apache.hadoop.fs.FSDataOutputStream; |
|
10 |
import org.apache.hadoop.fs.FileSystem; |
|
11 |
import org.apache.hadoop.fs.Path; |
|
12 |
|
|
13 |
import eu.dnetlib.iis.core.java.PortBindings; |
|
14 |
import eu.dnetlib.iis.core.java.porttype.AnyPortType; |
|
15 |
import eu.dnetlib.iis.core.java.porttype.PortType; |
|
16 |
|
|
17 |
/** |
|
18 |
* Job properties dumper process. |
|
19 |
* Writes all properties to job.properties file stored in output directory provided as input parameter. |
|
20 |
* |
|
21 |
* @author mhorst |
|
22 |
* |
|
23 |
*/ |
|
24 |
public class JobPropertiesDumperProcess implements eu.dnetlib.iis.core.java.Process { |
|
25 |
|
|
26 |
public static final String OUTPUT_PORT = "output"; |
|
27 |
|
|
28 |
private static final Map<String, PortType> outputPorts = new HashMap<String, PortType>(); |
|
29 |
|
|
30 |
{ |
|
31 |
outputPorts.put(OUTPUT_PORT, new AnyPortType()); |
|
32 |
} |
|
33 |
|
|
34 |
@Override |
|
35 |
public void run(PortBindings portBindings, Configuration conf, |
|
36 |
Map<String, String> parameters) throws Exception { |
|
37 |
Properties props = new Properties(); |
|
38 |
for(Map.Entry<String,String> property : conf) { |
|
39 |
props.setProperty(property.getKey(), |
|
40 |
property.getValue()); |
|
41 |
} |
|
42 |
FileSystem fs = FileSystem.get(conf); |
|
43 |
FSDataOutputStream outputStream = fs.create(new Path( |
|
44 |
portBindings.getOutput().get(OUTPUT_PORT), |
|
45 |
"job.properties"), true); |
|
46 |
try { |
|
47 |
props.store(outputStream, ""); |
|
48 |
} finally { |
|
49 |
outputStream.close(); |
|
50 |
} |
|
51 |
} |
|
52 |
|
|
53 |
@Override |
|
54 |
public Map<String, PortType> getInputPorts() { |
|
55 |
return Collections.emptyMap(); |
|
56 |
} |
|
57 |
|
|
58 |
@Override |
|
59 |
public Map<String, PortType> getOutputPorts() { |
|
60 |
return outputPorts; |
|
61 |
} |
|
62 |
|
|
63 |
} |
|
0 | 64 |
modules/icm-iis-common/branches/IIS-CDH-5.3.0/src/main/java/eu/dnetlib/iis/common/oozie/property/OoziePropertiesDumperProcess.java | ||
---|---|---|
1 |
package eu.dnetlib.iis.common.oozie.property; |
|
2 |
|
|
3 |
import java.util.Collections; |
|
4 |
import java.util.HashMap; |
|
5 |
import java.util.Map; |
|
6 |
import java.util.Properties; |
|
7 |
|
|
8 |
import org.apache.hadoop.conf.Configuration; |
|
9 |
import org.apache.hadoop.fs.FSDataOutputStream; |
|
10 |
import org.apache.hadoop.fs.FileSystem; |
|
11 |
import org.apache.hadoop.fs.Path; |
|
12 |
|
|
13 |
import eu.dnetlib.iis.core.java.PortBindings; |
|
14 |
import eu.dnetlib.iis.core.java.porttype.AnyPortType; |
|
15 |
import eu.dnetlib.iis.core.java.porttype.PortType; |
|
16 |
|
|
17 |
/** |
|
18 |
* Oozie properties dumper process. |
|
19 |
* Writes all oozie parameters to job.properties file stored in output directory provided as input parameter. |
|
20 |
* |
|
21 |
* @author mhorst |
|
22 |
* |
|
23 |
*/ |
|
24 |
public class OoziePropertiesDumperProcess implements eu.dnetlib.iis.core.java.Process { |
|
25 |
|
|
26 |
public static final String OUTPUT_PORT = "output"; |
|
27 |
|
|
28 |
private static final Map<String, PortType> outputPorts = new HashMap<String, PortType>(); |
|
29 |
|
|
30 |
{ |
|
31 |
outputPorts.put(OUTPUT_PORT, new AnyPortType()); |
|
32 |
} |
|
33 |
|
|
34 |
@Override |
|
35 |
public void run(PortBindings portBindings, Configuration jobConf, |
|
36 |
Map<String, String> parameters) throws Exception { |
|
37 |
|
|
38 |
Configuration actionConf = new Configuration(false); |
|
39 |
String actionConfLocation = System.getProperty("oozie.action.conf.xml"); |
|
40 |
System.out.println("loading properties from: " + actionConfLocation); |
|
41 |
actionConf.addResource(new Path("file:///", actionConfLocation)); |
|
42 |
|
|
43 |
Properties props = new Properties(); |
|
44 |
for(Map.Entry<String,String> property : actionConf) { |
|
45 |
props.setProperty(property.getKey(), |
|
46 |
property.getValue()); |
|
47 |
} |
|
48 |
FileSystem fs = FileSystem.get(actionConf); |
|
49 |
FSDataOutputStream outputStream = fs.create(new Path( |
|
50 |
portBindings.getOutput().get(OUTPUT_PORT), |
|
51 |
"job.properties"), true); |
|
52 |
try { |
|
53 |
props.store(outputStream, ""); |
|
54 |
} finally { |
|
55 |
outputStream.close(); |
|
56 |
} |
|
57 |
} |
|
58 |
|
|
59 |
@Override |
|
60 |
public Map<String, PortType> getInputPorts() { |
|
61 |
return Collections.emptyMap(); |
|
62 |
} |
|
63 |
|
|
64 |
@Override |
|
65 |
public Map<String, PortType> getOutputPorts() { |
|
66 |
return outputPorts; |
|
67 |
} |
|
68 |
|
|
69 |
} |
|
0 | 70 |
modules/icm-iis-common/branches/IIS-CDH-5.3.0/src/main/java/eu/dnetlib/iis/common/lock/LockManagingProcess.java | ||
---|---|---|
4 | 4 |
import java.util.Map; |
5 | 5 |
import java.util.concurrent.Semaphore; |
6 | 6 |
|
7 |
import org.apache.hadoop.conf.Configuration; |
|
7 | 8 |
import org.apache.hadoop.ha.ZKFailoverController; |
8 | 9 |
import org.apache.log4j.Logger; |
9 | 10 |
import org.apache.zookeeper.CreateMode; |
... | ... | |
13 | 14 |
import org.apache.zookeeper.ZooDefs; |
14 | 15 |
import org.apache.zookeeper.ZooKeeper; |
15 | 16 |
|
16 |
import eu.dnetlib.iis.core.java.HadoopContext; |
|
17 | 17 |
import eu.dnetlib.iis.core.java.PortBindings; |
18 | 18 |
import eu.dnetlib.iis.core.java.porttype.PortType; |
19 | 19 |
|
... | ... | |
57 | 57 |
} |
58 | 58 |
|
59 | 59 |
@Override |
60 |
public void run(PortBindings portBindings, HadoopContext context,
|
|
60 |
public void run(PortBindings portBindings, Configuration conf,
|
|
61 | 61 |
Map<String, String> parameters) throws Exception { |
62 | 62 |
|
63 | 63 |
if (!parameters.containsKey(PARAM_NODE_ID)) { |
... | ... | |
67 | 67 |
throw new Exception("lock mode not provided!"); |
68 | 68 |
} |
69 | 69 |
|
70 |
String zkConnectionString = context.getConfiguration().get(
|
|
70 |
String zkConnectionString = conf.get(
|
|
71 | 71 |
ZKFailoverController.ZK_QUORUM_KEY); |
72 | 72 |
if (zkConnectionString==null || zkConnectionString.isEmpty()) { |
73 | 73 |
throw new Exception("zookeeper quorum is unknown, invalid " + |
modules/icm-iis-common/branches/IIS-CDH-5.3.0/src/main/resources/eu/dnetlib/iis/common/protobuf/converter/avro_to_protobuf/oozie_app/workflow.xml | ||
---|---|---|
20 | 20 |
</property> |
21 | 21 |
</parameters> |
22 | 22 |
|
23 |
<start to="converter" />
|
|
23 |
<start to="generate-schema" />
|
|
24 | 24 |
|
25 |
<action name="generate-schema"> |
|
26 |
<java> |
|
27 |
<job-tracker>${jobTracker}</job-tracker> |
|
28 |
<name-node>${nameNode}</name-node> |
|
29 |
<main-class>eu.dnetlib.iis.core.javamapreduce.hack.AvroSchemaGenerator</main-class> |
|
30 |
<arg>${param_avro_input_class}</arg> |
|
31 |
<capture-output /> |
|
32 |
</java> |
|
33 |
<ok to="converter" /> |
|
34 |
<error to="fail" /> |
|
35 |
</action> |
|
36 |
|
|
25 | 37 |
<action name="converter"> |
26 | 38 |
<map-reduce> |
27 | 39 |
<job-tracker>${jobTracker}</job-tracker> |
... | ... | |
33 | 45 |
<configuration> |
34 | 46 |
<property> |
35 | 47 |
<name>mapreduce.inputformat.class</name> |
36 |
<value>eu.dnetlib.iis.core.javamapreduce.hack.KeyInputFormat</value>
|
|
48 |
<value>org.apache.avro.mapreduce.AvroKeyInputFormat</value>
|
|
37 | 49 |
</property> |
38 | 50 |
<property> |
39 | 51 |
<name>mapreduce.outputformat.class</name> |
... | ... | |
93 | 105 |
<value>${param_converter_class}</value> |
94 | 106 |
</property> |
95 | 107 |
|
96 |
|
|
97 | 108 |
<!-- ## Schemas --> |
98 | 109 |
<property> |
99 |
<name>eu.dnetlib.iis.avro.input.class</name>
|
|
100 |
<value>${param_avro_input_class}</value>
|
|
110 |
<name>avro.schema.input.key</name>
|
|
111 |
<value>${wf:actionData('generate-schema')[wf:conf('param_avro_input_class')]}</value>
|
|
101 | 112 |
</property> |
102 | 113 |
|
103 |
<property> |
|
104 |
<name>eu.dnetlib.iis.avro.map.output.value.class</name> |
|
105 |
<value>org.apache.avro.Schema.Type.NULL</value> |
|
106 |
</property> |
|
107 |
|
|
108 |
<!-- ### Schema of the data produced by the reducer. --> |
|
109 |
<property> |
|
110 |
<name>eu.dnetlib.iis.avro.output.class</name> |
|
111 |
<value>org.apache.avro.Schema.Type.NULL</value> |
|
112 |
</property> |
|
113 |
|
|
114 | 114 |
<!-- ## Specification of the input and output data store --> |
115 | 115 |
<property> |
116 | 116 |
<name>mapred.input.dir</name> |
modules/icm-iis-common/branches/IIS-CDH-5.3.0/pom.xml | ||
---|---|---|
3 | 3 |
<parent> |
4 | 4 |
<groupId>eu.dnetlib</groupId> |
5 | 5 |
<artifactId>dnet-hadoop-parent</artifactId> |
6 |
<version>1.0.0</version> |
|
6 |
<version>1.0.0-SNAPSHOT</version>
|
|
7 | 7 |
</parent> |
8 | 8 |
<modelVersion>4.0.0</modelVersion> |
9 | 9 |
<artifactId>icm-iis-common</artifactId> |
Also available in: Unified diff
merging trunk changes with IIS-CDH-5.3.0 branch