Project

General

Profile

« Previous | Next » 

Revision 28252

Added by Eri Katsari over 10 years ago

View differences:

modules/dnet-openaire-stats/trunk/src/test/java/eu/dnetlib/data/mapreduce/hbase/statsExport/utils/StatsOozieWorkflow.xml
1
<workflow-app name="test-core_examples_javamapreduce_cloner_with_multiple_output"
1
<workflow-app
2
	name="test-core_examples_javamapreduce_cloner_with_multiple_output"
2 3
	xmlns="uri:oozie:workflow:0.3">
3
	<!-- This example writes to 2 datastores: person and documents. The class responsible
4
		for writing multiple datastores is: eu.dnetlib.iis.core.examples.javamapreduce.PersonClonerMapperMultipleOutput.
5
		-->
6
	<start to="mr_exporter"/>
4
	<!-- map reduce job that exports hbase data and prepares them for import 
5
		to the relation database used for statistics generation -->
6
	<start to="mr_exporter" />
7 7
	<action name="mr_exporter">
8 8
		<map-reduce>
9 9
			<job-tracker>${jobTracker}</job-tracker>
10 10
			<name-node>${nameNode}</name-node>
11
			<!-- The data generated by this node in the previous run is deleted in this section
12
				-->
11
			<!-- The data generated by this node in the previous run is deleted in 
12
				this section -->
13 13
			<prepare>
14
				<delete path="${nameNode}${workingDir}/mr_cloner" />
14
				<delete path="${nameNode}${workingDir}/${outputPath}" />
15 15
			</prepare>
16
			<!-- That's a multiple output MapReduce job, so no need to create mr_cloner directory,
17
				since it will be created by MapReduce /> -->
16
			<!-- That's a multiple output MapReduce job, so no need to create mr_cloner 
17
				directory, since it will be created by MapReduce /> -->
18 18
			<configuration>
19
				
20
				<!-- # Standard set of options that stays the same regardless of a concrete definition
21
					of map-reduce job -->
22
				
19

  
20
				<!-- # Standard set of options that stays the same regardless of a concrete 
21
					definition of map-reduce job -->
22

  
23 23
				<!-- ## Various options -->
24
				
25
				<!--This property seems to not be needed -->
26
				<!--<property> <name>mapred.job.queue.name</name> <value>${queueName}</value> </property>
27
					-->
24
              <!-- <PARAM name="index.entity.links" required="true" description="entity 
25
					joiner configuration" /> -->
26
				<!-- <PARAM name="contextmap" required="true" description="context map 
27
					(ContextDSResources)" /> -->
28

  
28 29
				<property>
29 30
					<name>mapreduce.inputformat.class</name>
30
					<value>eu.dnetlib.iis.core.javamapreduce.hack.KeyInputFormat</value>
31
					<value>org.apache.hadoop.hbase.mapreduce.TableInputFormat</value>
31 32
				</property>
32 33
				<property>
33 34
					<name>mapred.mapoutput.key.class</name>
34
					<value>org.apache.avro.mapred.AvroKey</value>
35
					<value>org.apache.hadoop.io.Text</value>
35 36
				</property>
36 37
				<property>
37 38
					<name>mapred.mapoutput.value.class</name>
38
					<value>org.apache.avro.mapred.AvroValue</value>
39
					<value>org.apache.hadoop.hbase.io.ImmutableBytesWritable</value>
39 40
				</property>
40 41
				<property>
41 42
					<name>mapred.output.key.class</name>
42
					<value>org.apache.avro.mapred.AvroKey</value>
43
					<value>org.apache.hadoop.io.Text</value>
43 44
				</property>
44 45
				<property>
45 46
					<name>mapred.output.value.class</name>
46
					<value>org.apache.avro.mapred.AvroValue</value>
47
				</property>
48
				<property>
49
					<name>mapred.output.key.comparator.class</name>
50
					<value>eu.dnetlib.iis.core.javamapreduce.hack.KeyComparator</value>
51
				</property>
52
				<property>
53
					<name>io.serializations</name>
54
					<value>
55
						org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization,org.apache.hadoop.io.serializer.avro.AvroReflectSerialization,org.apache.avro.hadoop.io.AvroSerialization
47
					<value>org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
56 48
					</value>
57 49
				</property>
50
           	<!-- ## This is required for new MapReduce API usage -->
51

  
58 52
				<property>
59
					<name>mapred.output.value.groupfn.class</name>
60
					<value>eu.dnetlib.iis.core.javamapreduce.hack.KeyComparator</value>
61
				</property>
62
				<property>
63
					<name>rpc.engine.org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB</name>
64
					<value>org.apache.hadoop.ipc.ProtobufRpcEngine</value>
65
				</property>
66
				
67
				<!-- ## This is required for new MapReduce API usage -->
68
				
69
				<property>
70 53
					<name>mapred.mapper.new-api</name>
71 54
					<value>true</value>
72 55
				</property>
......
74 57
					<name>mapred.reducer.new-api</name>
75 58
					<value>true</value>
76 59
				</property>
77
				
60

  
78 61
				<!-- # Job-specific options -->
79
				
80
				<!-- ## Names of all output ports -->
81
				
62

  
82 63
				<property>
83
					<name>avro.mapreduce.multipleoutputs</name>
84
					<value>person age</value>
64
					<name>dfs.blocksize</name>
65
					<value>32M</value>
85 66
				</property>
86
				
87
				<!-- ## Output classes for all output ports -->
88
				
89 67
				<property>
90
					<name>avro.mapreduce.multipleoutputs.namedOutput.person.format</name>
91
					<value>org.apache.avro.mapreduce.AvroKeyOutputFormat</value>
68
					<name>mapred.output.compress</name>
69
					<value>false</value>
92 70
				</property>
71

  
72

  
93 73
				<property>
94
					<name>avro.mapreduce.multipleoutputs.namedOutput.age.format</name>
95
					<value>org.apache.avro.mapreduce.AvroKeyOutputFormat</value>
74
					<name>mapred.reduce.tasks.speculative.execution</name>
75
					<value>false</value>
96 76
				</property>
97
				
98
				<!-- ## Classes of mapper and reducer -->
99
				
77

  
100 78
				<property>
101
					<name>mapreduce.map.class</name>
102
					<value>eu.dnetlib.iis.core.examples.javamapreduce.PersonClonerMapper</value>
79
					<name>mapred.reduce.tasks.speculative.execution</name>
80
					<value>false</value>
103 81
				</property>
104 82
				<property>
105
					<name>mapreduce.reduce.class</name>
106
					<value>eu.dnetlib.iis.core.examples.javamapreduce.MultipleOutputPersonClonerReducer</value>
83
					<name>mapreduce.map.speculative</name>
84
					<value>false</value>
107 85
				</property>
108
				
109
				<!-- ## Schemas -->
110
				
111
				<!-- ### Schema of the data ingested by the mapper. To be more precise, it's the schema
112
					of Avro data passed as template parameter of the AvroKey object passed to mapper.
113
					-->
86

  
87
				<!-- I/O FORMAT -->
114 88
				<property>
115
					<name>eu.dnetlib.iis.avro.input.class</name>
116
					<value>eu.dnetlib.iis.core.examples.schemas.documentandauthor.Person</value>
89
					<name>mapreduce.outputformat.class</name>
90
					<value>org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
91
					</value>
117 92
				</property>
118
				
119
				<!-- ### Schemas of the data produced by the mapper -->
120
				
121
				<!-- #### Schema of the key produced by the mapper. To be more precise, it's the schema
122
					of Avro data produced by the mapper and passed forward as template paramter of
123
					AvroKey object. -->
124
				
93

  
94

  
95
				<!-- ## Names of all output ports -->
96
 
125 97
				<property>
126
					<name>eu.dnetlib.iis.avro.map.output.key.class</name>
127
					<value>org.apache.avro.Schema.Type.STRING</value>
98
					<name>mapreduce.multipleoutputs</name>
99
					<value>datasourceLanguage datasource 
100
<!-- 					      project  -->
101
<!-- 					    result organization datasourceOrganization -->
102
<!-- 					    datasourceTopic projectOrganization  -->
103
<!-- 					    resultClaim resultClassification resultConcept  -->
104
<!-- 					    resultLanguage resultOrganization  -->
105
<!-- 					    resultResult resultProject resultResult resultTopic category claim concept  -->
106
<!-- 					      resultLanguage resultDatasource -->
107
					    	</property>
108

  
109
				<!-- ## Output classes for all output ports -->
110

  
111
				 <property>
112
					<name>mapreduce.multipleoutputs.namedOutput.datasource.format </name>
113
					<value>TextOutputFormat.class</value>
128 114
				</property>
129
				
130
				
131
				<!-- #### Schema of the value produced by the mapper. To be more precise, it's the
132
					schema of Avro data produced by the mapper and passed forward as template paramter
133
					of AvroValue object. -->
134
				
135 115
				<property>
136
					<name>eu.dnetlib.iis.avro.map.output.value.class</name>
137
					<value>eu.dnetlib.iis.core.examples.schemas.documentandauthor.Person</value>
116
					<name>avro.mapreduce.multipleoutputs.namedOutput.datasourceLanguage.format</name>
117
				<value>TextOutputFormat.class</value>
138 118
				</property>
139
				
140
				<!-- ### Shema of multiple output ports. -->
141
				
119

  
120
				<!-- ## Classes of mapper and reducer -->
121

  
142 122
				<property>
143
					<name>eu.dnetlib.iis.avro.multipleoutputs.class.person</name>
144
					<value>eu.dnetlib.iis.core.examples.schemas.documentandauthor.Person</value>
123
					<name>mapreduce.map.class</name>
124
					<value>eu.dnetlib.data.mapreduce.hbase.statsExport.StatsMapper
125
					</value>
145 126
				</property>
146
				
147 127
				<property>
148
					<name>eu.dnetlib.iis.avro.multipleoutputs.class.age</name>
149
					<value>eu.dnetlib.iis.core.examples.schemas.documentandauthor.PersonAge</value>
128
					<name>mapreduce.reduce.class</name>
129
					<value>eu.dnetlib.data.mapreduce.hbase.statsExport.StatsReducer
130
					</value>
150 131
				</property>
151
				
132

  
133

  
152 134
				<!-- ## Specification of the input and output data store -->
153
				
135
   
136
				<!--delim character used to seperate fields in hdfs dump files -->
154 137
				<property>
155
					<name>mapred.input.dir</name>
156
					<value>${workingDir}/data_producer/person</value>
138
					<name>mapred.output.delim</name>
139
					<value>${Stats.delimCharacter}</value>
157 140
				</property>
158
				<!-- This directory does not correspond to a data store. In fact, this directory only
159
					contains multiple data stores. It has to be set to the name of the workflow node.-->
141
				<!--default string for Null String Values -->
160 142
				<property>
161
					<name>mapred.output.dir</name>
162
					<value>${workingDir}/mr_cloner</value>
143
					<name>mapred.output.nullString</name>
144
					<value>${Stats.nullStringField}</value>
163 145
				</property>
164
				
165
				<!-- ## Workflow node parameters -->
166
				
146

  
147
				<!--default string for Null Numeric Values -->
167 148
				<property>
168
					<name>copiesCount</name>
169
					<value>2</value>
149
					<name>mapred.output.nullNum</name>
150
					<value>${Stats.nullNumericField}</value>
170 151
				</property>
152
				<!--source hbase table -->
171 153
				<property>
172
					<name>reducerCopiesCount</name>
173
					<value>3</value>
154
					<name>hbase.mapreduce.inputtable</name>
155
					<value>${Stats.HbaseSourceTable}</value>
174 156
				</property>
175
			</configuration>
176
		</map-reduce>
177
		<ok to="db_prepare" />
178
		<error to="fail" />
179
	</action>
180
	
181
	<!-- cloner works on duplicated data -->
182
	<action name="db_prepare">
183
		<java>
184
			<job-tracker>${jobTracker}</job-tracker>
185
			<name-node>${nameNode}</name-node>
186
			<!-- The data generated by this node is deleted in this section -->
187
			<prepare>
188
				<delete path="${nameNode}${workingDir}/cloner" />
189
				<mkdir path="${nameNode}${workingDir}/cloner" />
190
			</prepare>
191
			<configuration>
157
 
158
				<!-- This directory does not correspond to a data store. In fact, this 
159
					directory only contains multiple data stores. It has to be set to the name 
160
					of the workflow node. -->
192 161
				<property>
193
					<name>mapred.job.queue.name</name>
194
					<value>${queueName}</value>
162
					<name>mapred.output.dir</name>
163
					<value>${nameNode}${workingDir}/${Stats.outputPath}</value>
195 164
				</property>
196
			</configuration>
197
			<!-- This is simple wrapper for the Java code -->
198
			<main-class>eu.dnetlib.iis.core.java.ProcessWrapper</main-class>
199
			<!-- The business Java code that gets to be executed -->
200
			<arg>eu.dnetlib.iis.core.examples.java.PersonCloner</arg>
201
			<!-- All input and output ports have to be bound to paths in HDFS, working directory
202
				has to be specified as well -->
203
			<arg>-SworkingDir=${workingDir}/cloner/working_dir</arg>
204
			<arg>-Iperson=${workingDir}/mr_cloner/person</arg>
205
			<arg>-Operson=${workingDir}/cloner/person</arg>
206
		</java>
207
		<ok to="mr_SqoopImport" />
208
		<error to="fail" />
209
	</action>
210
	
211
	
212
	<action name="mr_SqoopImport">
213
		<java>
214
			<job-tracker>${jobTracker}</job-tracker>
215
			<name-node>${nameNode}</name-node>
216
			<!-- The data generated by this node is deleted in this section -->
217
			<prepare>
218
				<delete path="${nameNode}${workingDir}/cloner" />
219
				<mkdir path="${nameNode}${workingDir}/cloner" />
220
			</prepare>
221
			<configuration>
165

  
166
				<!-- ## Workflow node parameters -->
167

  
222 168
				<property>
223
					<name>mapred.job.queue.name</name>
224
					<value>${queueName}</value>
169
					<name>copiesCount</name>
170
					<value>1</value>
225 171
				</property>
226
			</configuration>
227
			<!-- This is simple wrapper for the Java code -->
228
			<main-class>eu.dnetlib.iis.core.java.ProcessWrapper</main-class>
229
			<!-- The business Java code that gets to be executed -->
230
			<arg>eu.dnetlib.iis.core.examples.java.PersonCloner</arg>
231
			<!-- All input and output ports have to be bound to paths in HDFS, working directory
232
				has to be specified as well -->
233
			<arg>-SworkingDir=${workingDir}/cloner/working_dir</arg>
234
			<arg>-Iperson=${workingDir}/mr_cloner/person</arg>
235
			<arg>-Operson=${workingDir}/cloner/person</arg>
236
		</java>
237
		<ok to="db_finalize" />
238
		<error to="fail" />
239
	</action>
240
	
241
	<action name="db_finalize">
242
		<java>
243
			<job-tracker>${jobTracker}</job-tracker>
244
			<name-node>${nameNode}</name-node>
245
			<!-- The data generated by this node is deleted in this section -->
246
			<prepare>
247
				<delete path="${nameNode}${workingDir}/cloner" />
248
				<mkdir path="${nameNode}${workingDir}/cloner" />
249
			</prepare>
250
			<configuration>
251 172
				<property>
252
					<name>mapred.job.queue.name</name>
253
					<value>${queueName}</value>
173
					<name>reducerCopiesCount</name>
174
					<value>10</value>
254 175
				</property>
255 176
			</configuration>
256
			<!-- This is simple wrapper for the Java code -->
257
			<main-class>eu.dnetlib.iis.core.java.ProcessWrapper</main-class>
258
			<!-- The business Java code that gets to be executed -->
259
			<arg>eu.dnetlib.iis.core.examples.java.PersonCloner</arg>
260
			<!-- All input and output ports have to be bound to paths in HDFS, working directory
261
				has to be specified as well -->
262
			<arg>-SworkingDir=${workingDir}/cloner/working_dir</arg>
263
			<arg>-Iperson=${workingDir}/mr_cloner/person</arg>
264
			<arg>-Operson=${workingDir}/cloner/person</arg>
265
		</java>
177
		</map-reduce>
266 178
		<ok to="end" />
267 179
		<error to="fail" />
268 180
	</action>
269
	
270
	
271
	
272
	
181

  
182
	<!-- cloner works on duplicated data -->
183
<!-- 	<action name="db_prepare"> -->
184
<!-- 		<java> -->
185
<!-- 			<job-tracker>${jobTracker}</job-tracker> -->
186
<!-- 			<name-node>${nameNode}</name-node> -->
187
<!-- 			<!-- The data generated by this node is deleted in this section --> -->
188
<!-- 			<prepare> -->
189
<!-- 				<delete path="${nameNode}${workingDir}/cloner" /> -->
190
<!-- 				<mkdir path="${nameNode}${workingDir}/cloner" /> -->
191
<!-- 			</prepare> -->
192
<!-- 			<configuration> -->
193
<!-- 				<property> -->
194
<!-- 					<name>mapred.job.queue.name</name> -->
195
<!-- 					<value>${queueName}</value> -->
196
<!-- 				</property> -->
197
<!-- 			</configuration> -->
198
<!-- 			<!-- This is simple wrapper for the Java code --> -->
199
<!-- 			<main-class>eu.dnetlib.iis.core.java.ProcessWrapper</main-class> -->
200
<!-- 			<!-- The business Java code that gets to be executed --> -->
201
<!-- 			<arg>eu.dnetlib.iis.core.examples.java.PersonCloner</arg> -->
202
<!-- 			<!-- All input and output ports have to be bound to paths in HDFS, working  -->
203
<!-- 				directory has to be specified as well --> -->
204
<!-- 			<arg>-SworkingDir=${workingDir}/cloner/working_dir</arg> -->
205
<!-- 			<arg>-Iperson=${workingDir}/mr_cloner/person</arg> -->
206
<!-- 			<arg>-Operson=${workingDir}/cloner/person</arg> -->
207
<!-- 		</java> -->
208
<!-- 		<ok to="mr_SqoopImport" /> -->
209
<!-- 		<error to="fail" /> -->
210
<!-- 	</action> -->
211

  
212

  
213
<!-- 	<action name="mr_SqoopImport"> -->
214
<!-- 		<java> -->
215
<!-- 			<job-tracker>${jobTracker}</job-tracker> -->
216
<!-- 			<name-node>${nameNode}</name-node> -->
217
<!-- 			<!-- The data generated by this node is deleted in this section --> -->
218
<!-- 			<prepare> -->
219
<!-- 				<delete path="${nameNode}${workingDir}/cloner" /> -->
220
<!-- 				<mkdir path="${nameNode}${workingDir}/cloner" /> -->
221
<!-- 			</prepare> -->
222
<!-- 			<configuration> -->
223
<!-- 				<property> -->
224
<!-- 					<name>mapred.job.queue.name</name> -->
225
<!-- 					<value>${queueName}</value> -->
226
<!-- 				</property> -->
227
<!-- 			</configuration> -->
228
<!-- 			<!-- This is simple wrapper for the Java code --> -->
229
<!-- 			<main-class>eu.dnetlib.iis.core.java.ProcessWrapper</main-class> -->
230
<!-- 			<!-- The business Java code that gets to be executed --> -->
231
<!-- 			<arg>eu.dnetlib.iis.core.examples.java.PersonCloner</arg> -->
232
<!-- 			<!-- All input and output ports have to be bound to paths in HDFS, working  -->
233
<!-- 				directory has to be specified as well --> -->
234
<!-- 			<arg>-SworkingDir=${workingDir}/cloner/working_dir</arg> -->
235
<!-- 			<arg>-Iperson=${workingDir}/mr_cloner/person</arg> -->
236
<!-- 			<arg>-Operson=${workingDir}/cloner/person</arg> -->
237
<!-- 		</java> -->
238
<!-- 		<ok to="db_finalize" /> -->
239
<!-- 		<error to="fail" /> -->
240
<!-- 	</action> -->
241

  
242
<!-- 	<action name="db_finalize"> -->
243
<!-- 		<java> -->
244
<!-- 			<job-tracker>${jobTracker}</job-tracker> -->
245
<!-- 			<name-node>${nameNode}</name-node> -->
246
<!-- 			<!-- The data generated by this node is deleted in this section --> -->
247
<!-- 			<prepare> -->
248
<!-- 				<delete path="${nameNode}${workingDir}/cloner" /> -->
249
<!-- 				<mkdir path="${nameNode}${workingDir}/cloner" /> -->
250
<!-- 			</prepare> -->
251
<!-- 			<configuration> -->
252
<!-- 				<property> -->
253
<!-- 					<name>mapred.job.queue.name</name> -->
254
<!-- 					<value>${queueName}</value> -->
255
<!-- 				</property> -->
256
<!-- 			</configuration> -->
257
<!-- 			<!-- This is simple wrapper for the Java code --> -->
258
<!-- 			<main-class>eu.dnetlib.iis.core.java.ProcessWrapper</main-class> -->
259
<!-- 			<!-- The business Java code that gets to be executed --> -->
260
<!-- 			<arg>eu.dnetlib.iis.core.examples.java.PersonCloner</arg> -->
261
<!-- 			<!-- All input and output ports have to be bound to paths in HDFS, working  -->
262
<!-- 				directory has to be specified as well --> -->
263
<!-- 			<arg>-SworkingDir=${workingDir}/cloner/working_dir</arg> -->
264
<!-- 			<arg>-Iperson=${workingDir}/mr_cloner/person</arg> -->
265
<!-- 			<arg>-Operson=${workingDir}/cloner/person</arg> -->
266
<!-- 		</java> -->
267
<!-- 		<ok to="end" /> -->
268
<!-- 		<error to="fail" /> -->
269
<!-- 	</action> -->
270

  
271

  
272

  
273

  
273 274
	<kill name="fail">
274 275
		<message>
275
			Unfortunately, the process failed -- error message: [${wf:errorMessage(wf:lastErrorNode())}]
276
			Unfortunately, the process failed -- error message:
277
			[${wf:errorMessage(wf:lastErrorNode())}]
276 278
		</message>
277 279
	</kill>
278 280
	<end name="end" />

Also available in: Unified diff