Create an account

Very important

  • To access the important data of the forums, you must be active in each forum and especially in the leaks and database leaks section, send data and after sending the data and activity, data and important content will be opened and visible for you.
  • You will only see chat messages from people who are at or below your level.
  • More than 500,000 database leaks and millions of account leaks are waiting for you, so access and view with more activity.
  • Many important data are inactive and inaccessible for you, so open them with activity. (This will be done automatically)


Thread Rating:
  • 548 Vote(s) - 3.53 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Spring Batch: One reader, multiple processors and writers

#1
In Spring batch I need to pass the items read by an ItemReader to two different processors and writer. What I'm trying to achieve is that...

<pre>
+---> ItemProcessor#1 ---> ItemWriter#1
|
ItemReader ---> item ---+
|
+---> ItemProcessor#2 ---> ItemWriter#2
</pre>

This is needed because items written by ItemWriter#1 should be processed in a completely different way compared to the ones written by ItemWriter#2.
Moreover, ItemReader reads item from a database, and the queries it executes are so computational expensive that executing the same query twice should be discarded.

Any hint about how to achieve such set up ? Or, at least, a logically equivalent set up ?
Reply

#2
this is the solution I came up with.

So, the idea is to code a new Writer that "contains" both an ItemProcessor and an ItemWriter. Just to give you an idea, we called it PreprocessoWriter, and that's the core code.

private ItemWriter<O> writer;
private ItemProcessor<I, O> processor;

@Override
public void write(List<? extends I> items) throws Exception {
List<O> toWrite = new ArrayList<O>();
for (I item : items) {
toWrite.add(processor.process(item));
}
writer.write(toWrite);
}

There's a lot of things being left aside. Management of ItemStream, for instance. But in our particular scenario this was enough.

So you can just combine multiple PreprocessorWriter with CompositeWriter.
Reply

#3
There is an other solution if you have a reasonable amount of items (like less than 1 Go) : you can cache the result of your select into a collection wrapped in a Spring bean.

Then u can just read the collection twice with no cost.
Reply

#4
This solution is valid if your item should be processed by processor #1 and processor #2

You have to create a processor #0 with this signature:

class Processor0<Item, CompositeResultBean>

where `CompositeResultBean` is a bean defined as

class CompositeResultBean {
Processor1ResultBean result1;
Processor2ResultBean result2;
}

In your Processor #0 just delegate work to processors #1 and #2 and put result in `CompositeResultBean`

CompositeResultBean Processor0.process(Item item) {
final CompositeResultBean r = new CompositeResultBean();
r.setResult1(processor1.process(item));
r.setResult2(processor2.process(item));
return r;
}

Your own writer is a `CompositeItemWriter` that delegate to writer `CompositeResultBean.result1` or `CompositeResultBean.result2` (look at [PropertyExtractingDelegatingItemWriter](

[To see links please register here]

), maybe can help)
Reply

#5
I followed Luca's suggestion to use `PropertyExtractingDelegatingItemWriter` as writer and I was able to work with two different entities in one single step.

First of all what I did was to define a DTO that stores the two entities/results from the processor

public class DatabaseEntry {
private AccessLogEntry accessLogEntry;
private BlockedIp blockedIp;

public AccessLogEntry getAccessLogEntry() {
return accessLogEntry;
}

public void setAccessLogEntry(AccessLogEntry accessLogEntry) {
this.accessLogEntry = accessLogEntry;
}

public BlockedIp getBlockedIp() {
return blockedIp;
}

public void setBlockedIp(BlockedIp blockedIp) {
this.blockedIp = blockedIp;
}
}


Then I passed this DTO to the writer, a `PropertyExtractingDelegatingItemWriter` class where I define two customized methods to write the entities into the database, see my writer code below:

@Configuration
public class LogWriter extends LogAbstract {
@Autowired
private DataSource dataSource;

@Bean()
public PropertyExtractingDelegatingItemWriter<DatabaseEntry> itemWriterAccessLogEntry() {
PropertyExtractingDelegatingItemWriter<DatabaseEntry> propertyExtractingDelegatingItemWriter = new PropertyExtractingDelegatingItemWriter<DatabaseEntry>();
propertyExtractingDelegatingItemWriter.setFieldsUsedAsTargetMethodArguments(new String[]{"accessLogEntry", "blockedIp"});
propertyExtractingDelegatingItemWriter.setTargetObject(this);
propertyExtractingDelegatingItemWriter.setTargetMethod("saveTransaction");
return propertyExtractingDelegatingItemWriter;
}

public void saveTransaction(AccessLogEntry accessLogEntry, BlockedIp blockedIp) throws SQLException {
writeAccessLogTable(accessLogEntry);
if (blockedIp != null) {
writeBlockedIp(blockedIp);
}

}

private void writeBlockedIp(BlockedIp entry) throws SQLException {
PreparedStatement statement = dataSource.getConnection().prepareStatement("INSERT INTO blocked_ips (ip,threshold,startDate,endDate,comment) VALUES (?,?,?,?,?)");
statement.setString(1, entry.getIp());
statement.setInt(2, threshold);
statement.setTimestamp(3, Timestamp.valueOf(startDate));
statement.setTimestamp(4, Timestamp.valueOf(endDate));
statement.setString(5, entry.getComment());
statement.execute();
}

private void writeAccessLogTable(AccessLogEntry entry) throws SQLException {
PreparedStatement statement = dataSource.getConnection().prepareStatement("INSERT INTO log_entries (date,ip,request,status,userAgent) VALUES (?,?,?,?,?)");
statement.setTimestamp(1, Timestamp.valueOf(entry.getDate()));
statement.setString(2, entry.getIp());
statement.setString(3, entry.getRequest());
statement.setString(4, entry.getStatus());
statement.setString(5, entry.getUserAgent());
statement.execute();
}
}

With this approach you can get the wanted inital behaviour from a single reader for processing multiple entities and save them in a single step.





Reply

#6
You can use a `CompositeItemProcessor` and `CompositeItemWriter`

It won't look exactly like your schema, it will be sequential, but it will do the job.

Reply



Forum Jump:


Users browsing this thread:
1 Guest(s)

©0Day  2016 - 2023 | All Rights Reserved.  Made with    for the community. Connected through