1
|
define avro_load_document_to_dataset
|
2
|
org.apache.pig.piggybank.storage.avro.AvroStorage(
|
3
|
'input_schema_class', '$schema_input_document_to_dataset');
|
4
|
|
5
|
define avro_load_document_id
|
6
|
org.apache.pig.piggybank.storage.avro.AvroStorage(
|
7
|
'input_schema_class', '$schema_input_document_id');
|
8
|
|
9
|
define avro_load_document_to_mdstore
|
10
|
org.apache.pig.piggybank.storage.avro.AvroStorage(
|
11
|
'input_schema_class', '$schema_input_document_to_mdstore');
|
12
|
|
13
|
define avro_store_document_to_mdstore
|
14
|
org.apache.pig.piggybank.storage.avro.AvroStorage(
|
15
|
'index', '0',
|
16
|
'output_schema_class', '$schema_output_document_to_mdstore');
|
17
|
|
18
|
documentId = load '$input_document_id' using avro_load_document_id;
|
19
|
documentId = foreach documentId generate $0 as id;
|
20
|
|
21
|
documentToDataset = load '$input_document_to_dataset' using avro_load_document_to_dataset;
|
22
|
datasetIds = foreach documentToDataset generate datasetId as id;
|
23
|
datasetIdsDistinct = distinct datasetIds;
|
24
|
|
25
|
documentToMDStore = load '$input_document_to_mdstore' using avro_load_document_to_mdstore;
|
26
|
|
27
|
joined = join datasetIdsDistinct by id left, documentId by id;
|
28
|
joinedWithMDStore = join joined by datasetIdsDistinct::id, documentToMDStore by documentId;
|
29
|
|
30
|
joinedFiltered = filter joinedWithMDStore by documentId::id is null;
|
31
|
outputDocumentToMdstore = foreach joinedFiltered generate datasetIdsDistinct::id as documentId, documentToMDStore::mdStoreId as mdStoreId;
|
32
|
|
33
|
store outputDocumentToMdstore into '$output_document_to_mdstore' using avro_store_document_to_mdstore;
|