Project

General

Profile

1
package eu.dnetlib.data.objectstore.s3;
2

    
3
import com.amazonaws.ClientConfiguration;
4
import com.amazonaws.Protocol;
5
import com.amazonaws.auth.AWSCredentials;
6
import com.amazonaws.auth.AWSStaticCredentialsProvider;
7
import com.amazonaws.auth.BasicAWSCredentials;
8
import com.amazonaws.client.builder.AwsClientBuilder;
9
import com.amazonaws.services.s3.AmazonS3;
10
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
11
import com.amazonaws.services.s3.model.ListObjectsV2Request;
12
import com.amazonaws.services.s3.model.ListObjectsV2Result;
13
import com.amazonaws.services.s3.model.S3Object;
14
import com.amazonaws.services.s3.model.S3ObjectSummary;
15
import com.google.common.collect.Lists;
16
import com.mongodb.client.MongoCollection;
17
import com.mongodb.client.model.Filters;
18
import eu.dnetlib.data.objectstore.modular.ObjectStoreRecord;
19
import eu.dnetlib.data.objectstore.modular.connector.ObjectStore;
20
import eu.dnetlib.data.objectstore.rmi.MetadataObjectRecord;
21
import eu.dnetlib.data.objectstore.rmi.ObjectStoreFile;
22
import eu.dnetlib.data.objectstore.rmi.ObjectStoreServiceException;
23
import eu.dnetlib.enabling.resultset.ResultSetListener;
24
import org.apache.commons.lang3.exception.ExceptionUtils;
25
import org.apache.commons.logging.Log;
26
import org.apache.commons.logging.LogFactory;
27
import org.bson.Document;
28

    
29
import java.io.ByteArrayInputStream;
30
import java.io.IOException;
31
import java.io.InputStream;
32
import java.nio.file.Files;
33
import java.nio.file.Path;
34
import java.nio.file.Paths;
35
import java.nio.file.StandardCopyOption;
36
import java.util.UUID;
37
import java.util.concurrent.atomic.AtomicInteger;
38
import java.util.regex.Pattern;
39

    
40
public class S3ObjectStore implements ObjectStore {
41

    
42
    //CONSTANT VARIABLE NAME
43
    private static final String S3_REGION = "eu-west-3";
44
    private static final String URI_FIELD = "uri";
45
    private static final String ID_FIELD = "id";
46
    private static final String MIME_FIELD = "mime";
47
    private static final String ORIGINAL_OBJECT_FIELD = "originalObject";
48
    private static final String TIMESTAMP_FIELD = "timestamp";
49
    private static final String MD5_FIELD = "md5Sum";
50
    private static final String SIZE_FIELD = "size";
51

    
52

    
53
    private final String id;
54
    private final String interpretation;
55

    
56
    private final String s3AccessKey;
57
    private final String s3SecretKey;
58
    private final String s3EndPoint;
59
    private final String objectStoreBucket;
60

    
61
    private final String basePath;
62

    
63

    
64
    private final MongoCollection<Document> mongoCollection;
65

    
66
//    private AmazonS3 client;
67

    
68
    private static final Log log = LogFactory.getLog(S3ObjectStore.class);
69

    
70
    public S3ObjectStore(final String identifier, final String interpretation, final String s3AccessKey, final String s3SecretKey, final String s3EndPoint, final String objectStoreBucket, final MongoCollection<Document> mongoCollection, final String basePath) throws ObjectStoreServiceException {
71
        this.id = identifier;
72
        this.interpretation = interpretation;
73
        this.s3AccessKey = s3AccessKey;
74
        this.s3SecretKey = s3SecretKey;
75
        this.mongoCollection = mongoCollection;
76
        this.s3EndPoint = s3EndPoint;
77
        this.objectStoreBucket = objectStoreBucket;
78
        this.basePath = basePath;
79
    }
80

    
81

    
82
    private AmazonS3 initializeClient() throws ObjectStoreServiceException {
83
        try {
84
            final AWSCredentials credentials = new BasicAWSCredentials(this.s3AccessKey, this.s3SecretKey);
85
            final ClientConfiguration cfg = new ClientConfiguration().withProtocol(Protocol.HTTPS);
86
            final AmazonS3 s3 = AmazonS3ClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(credentials))
87
                    .withClientConfiguration(cfg)
88
                    .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(this.s3EndPoint, S3_REGION))
89
                    .build();
90
            if (s3 == null)
91
                throw new ObjectStoreServiceException("Cannot initialize s3 client because is NULL");
92
            return s3;
93
        } catch (Throwable e) {
94
            log.error("An Error happen on initialize client ", e);
95
            throw new ObjectStoreServiceException("Cannot initialize s3 client", e);
96
        }
97
    }
98

    
99

    
100
    @Override
101
    public String getId() {
102
        return this.id;
103
    }
104

    
105
    @Override
106
    public String getInterpretation() {
107
        return this.interpretation;
108
    }
109

    
110
    @Override
111
    public int feed(Iterable<ObjectStoreRecord> iterable, boolean upsert) throws ObjectStoreServiceException {
112
        final AtomicInteger count = new AtomicInteger();
113
        iterable.forEach(objectStoreRecord -> {
114
            try {
115
                feedObjectRecord(objectStoreRecord);
116
                count.incrementAndGet();
117
            } catch (ObjectStoreServiceException e) {
118
                log.error("Error on saving file in a temporary Folder");
119
            }
120
        });
121
        return count.intValue();
122
    }
123

    
124
    @Override
125
    public int feedMetadataRecord(Iterable<MetadataObjectRecord> iterable, boolean b) throws ObjectStoreServiceException {
126
        final AtomicInteger count = new AtomicInteger();
127
        iterable.forEach(mor -> {
128
            final ObjectStoreRecord r = new ObjectStoreRecord();
129
            r.setInputStream(new ByteArrayInputStream(mor.getRecord().getBytes()));
130
            final ObjectStoreFile fileMetadata = new ObjectStoreFile();
131
            fileMetadata.setObjectID(mor.getId());
132
            fileMetadata.setMimeType(mor.getMime());
133
            r.setFileMetadata(fileMetadata);
134
            try {
135
                feedObjectRecord(r);
136
                count.incrementAndGet();
137
            } catch (ObjectStoreServiceException e) {
138
                log.error("Unable to store record r", e);
139
            }
140
        });
141
        return count.intValue();
142
    }
143

    
144
    @Override
145
    public String feedObjectRecord(ObjectStoreRecord objectStoreRecord) throws ObjectStoreServiceException {
146
        final AmazonS3 client = initializeClient();
147
        try {
148
            long start = System.currentTimeMillis();
149
            final Path result = saveAndGenerateMD5(objectStoreRecord.getInputStream(), basePath);
150
            log.debug("Total time to download into fileSystem " + (System.currentTimeMillis() - start));
151
            if (result!= null) {
152
                log.debug("Saving object with ID: " + objectStoreRecord.getFileMetadata().getObjectID() + " on s3 ");
153
                client.putObject(objectStoreBucket, id + "/" + objectStoreRecord.getFileMetadata().getObjectID(), result.toFile());
154
                log.debug("Total time to put into the ObjectStore " + (System.currentTimeMillis() - start));
155
                log.debug("Deleting file");
156
                boolean deleteSuccess = Files.deleteIfExists(result);
157
                log.debug("Temporary file deleting success " + deleteSuccess);
158
                log.debug("Saved object on s3 ");
159
                final S3Object s3Object = client.getObject(objectStoreBucket ,id+ "/" + objectStoreRecord.getFileMetadata().getObjectID());
160
                double timestamp = System.currentTimeMillis();
161
                Document metadata = new Document()
162
                        .append(ID_FIELD, objectStoreRecord.getFileMetadata().getObjectID())
163
                        .append(MIME_FIELD, objectStoreRecord.getFileMetadata().getMimeType())
164
                        .append(ORIGINAL_OBJECT_FIELD, objectStoreRecord.getFileMetadata().toJSON())
165
                        .append(TIMESTAMP_FIELD, timestamp)
166
                        .append(MD5_FIELD, s3Object.getObjectMetadata().getETag())
167
                        .append(SIZE_FIELD, s3Object.getObjectMetadata().getContentLength())
168
                        .append(URI_FIELD, String.format("s3://%s/%s/%s", objectStoreBucket, id, objectStoreRecord.getFileMetadata().getObjectID()));
169
                log.debug("saving metadata object to the collection: " + metadata.toString());
170
                start = System.currentTimeMillis();
171
                mongoCollection.insertOne(metadata);
172
                log.debug("Total time to save in Mongo " + (System.currentTimeMillis() - start));
173

    
174
            }
175
        } catch (Throwable e) {
176
            log.error("Error on put file in the objectStore", e);
177
            log.error(ExceptionUtils.getStackTrace(e));
178
            throw new ObjectStoreServiceException(e);
179
        }
180
        return null;
181
    }
182

    
183
    @Override
184
    public ResultSetListener deliver(final Long from, final Long until) throws ObjectStoreServiceException {
185
        S3ObjectStoreResultSetListener resultSet = new S3ObjectStoreResultSetListener();
186
        resultSet.setMongoCollection(this.mongoCollection);
187
        resultSet.setObjectStoreID(getId());
188
        resultSet.setFromDate(from);
189
        resultSet.setUntilDate(until);
190
        return resultSet;
191
    }
192

    
193
    @Override
194
    public ResultSetListener deliverIds(final Iterable<String> ids) throws ObjectStoreServiceException {
195
        S3ObjectStoreResultSetListener resultSet = new S3ObjectStoreResultSetListener();
196
        resultSet.setMongoCollection(this.mongoCollection);
197
        resultSet.setObjectStoreID(getId());
198
        resultSet.setRecords(Lists.newArrayList(ids));
199
        return resultSet;
200
    }
201

    
202
    @Override
203
    public ObjectStoreFile deliverObject(String objectId) throws ObjectStoreServiceException {
204
        Document resultQuery = this.mongoCollection.find(Filters.eq("id", objectId)).first();
205
        if (resultQuery!= null)
206
            return ObjectStoreS3Utility.build(resultQuery);
207
        else return null;
208
    }
209

    
210
    @Override
211
    public int getSize() throws ObjectStoreServiceException {
212
        return (int) this.mongoCollection.count();
213
    }
214

    
215
    @Override
216
    public void deleteObject(String objectId) throws ObjectStoreServiceException {
217
        final Document response =this.mongoCollection.findOneAndDelete(Filters.eq("id", objectId));
218
        if (response == null)
219
            throw new ObjectStoreServiceException("Error document not found with objectId: "+objectId);
220

    
221
        final AmazonS3 client = initializeClient();
222

    
223
        client.deleteObject(this.objectStoreBucket, String.format("%s/%s", this.id, response.get(ID_FIELD)));
224
    }
225

    
226
    @Override
227
    public String getObject(String objectId) throws ObjectStoreServiceException {
228

    
229
        Document response = this.mongoCollection.find(Filters.eq("id", objectId)).first();
230
        if (response == null || !response.containsKey(URI_FIELD))
231
            return null;
232
        return response.getString(URI_FIELD);
233
    }
234

    
235
    @Override
236
    public boolean existIDStartsWith(String startId) throws ObjectStoreServiceException {
237
        return this.mongoCollection.count(Filters.regex("id", Pattern.compile(startId))) > 0;
238
    }
239

    
240
    @Override
241
    public boolean dropContent() throws ObjectStoreServiceException {
242
        final AmazonS3 client = initializeClient();
243
        final ListObjectsV2Request req = new ListObjectsV2Request().withBucketName(objectStoreBucket).withPrefix(id);
244
        ListObjectsV2Result result;
245
        do {
246
            result = client.listObjectsV2(req);
247

    
248
            for (S3ObjectSummary objectSummary : result.getObjectSummaries()) {
249
                log.debug(String.format(" - %s (size: %d)\n", objectSummary.getKey(), objectSummary.getSize()));
250
                client.deleteObject(objectStoreBucket, objectSummary.getKey());
251
                log.debug("Object Deleted");
252
            }
253
            String token = result.getNextContinuationToken();
254
            log.debug("Next Continuation Token: " + token);
255
            req.setContinuationToken(token);
256
        } while (result.isTruncated());
257

    
258
        this.mongoCollection.drop();
259
        return true;
260
    }
261

    
262
    private static Path saveAndGenerateMD5(final InputStream inputStream, final String basePath) throws IOException {
263
        if (inputStream == null) return null;
264
        Path tempFile = Paths.get(basePath,  UUID.randomUUID().toString());
265
        log.debug("Temp file On Default Location: " + tempFile.toString());
266
        long size = 0;
267
        try {
268
            Files.copy(inputStream, tempFile, StandardCopyOption.REPLACE_EXISTING);
269
        } catch (IOException e1) {
270
            log.error(e1);
271
        }
272
        return tempFile;
273
    }
274
}
(4-4/6)