What does de-batching do?
Debatching polls for incoming files, and when it finds a file and starts reading it, it doesn't start a single new instance of a BPEL process and delivers the file contents, but instead it chops up the file in "batches" of configurable size (typically 1), and starts a new instance for each batch. So for a file of 1000 records (lines) and a batch size of 1, you will end up with 1000 new instances. There is no guarantee that these 1000 instances will be executed in order of the records in the original file, mind you. These instances will be processed by the pool of engine threads just like any other active instance. For XML, as part of the patch install instructions for the 10.1.3.3 patchset, you install a "stream-based" XML parser called STaX, which is well-known in the Java community. Instead of reading the whole file into main memory, the STaX parser just "browses" like a lexical scanner through the XML file and does not build large trees of objects in memory.
How it works
To activate the de-batching behaviour of the adapter, you have to add the "PublishSize" attribute to the jca:operation tag in the adapter's wsdl:
<jca:operation
PhysicalDirectory="D:\"
ActivationSpec="oracle.tip.adapter.file.inbound.FileActivationSpec"
DeleteFile="true"
IncludeFiles=".*\.xml"
PublishSize="1"
PollingFrequency="10"
MinimumAge="5"
OpaqueSchema="false" >
What it will do at runtime is take your XML file, preserve the root tag, and for each child of the root, generate a separate inbound document containing the child with the root tag again wrapped around it:
<dcs320 xmlns="http://www.customer.com/schemas/dcs320/1.0">
<salesline>
<messagetype>DCS320</messagetype>
<messageid>487235</messageid>
<messagelinenumber>1</messagelinenumber>
<datamutationtype>0</datamutationtype>
<dcaddressnumber>0000410</dcaddressnumber>
</salesline>
<salesline>
<messagetype>DCS320</messagetype>
<messageid>487235</messageid>
<messagelinenumber>2</messagelinenumber>
<datamutationtype>0</datamutationtype>
<dcaddressnumber>0000411</dcaddressnumber>
</salesline>
<salesline>
<messagetype>DCS320</messagetype>
<messageid>487235</messageid>
<messagelinenumber>3</messagelinenumber>
<datamutationtype>0</datamutationtype>
<dcaddressnumber>0000412</dcaddressnumber>
</salesline>
</dcs320>
The above document will launch instances that get documents as input that look like:
<dcs320 xmlns="http://www.customer.com/schemas/dcs320/1.0">
<salesline>
<messagetype>DCS320</messagetype>
<messageid>487235</messageid>
<messagelinenumber>1</messagelinenumber>
<datamutationtype>0</datamutationtype>
<dcaddressnumber>0000410</dcaddressnumber>
</salesline>
</dcs320>
And so on, one for each <salesline> child. The ability to de-batch large XML files gives us the option to process large XML documents (that have a repeating structure) in BPEL, one piece at a time.
Process Control Challenges
The main challenge I've always had with debatching or other "parallel processing" patterns is to stay in control regarding error handling and monitoring progress & completion. Especially for non-transactional protocols like File and FTP, this can be tricky. Another neat feature that has been built into the 10.1.3.3 adapter is a batch notification mechanism. This means that at certain points in its processing, the adapter will post meta-data events about its progress to an event handler, typically another BPEL process. You configure this notification behaviour in the activationAgent section of the bpel.xml in the debatching process. Below an example of such a bpel.xml:
<?xml version = '1.0' encoding = 'UTF-8'?>
<BPELSuitcase>
<BPELProcess id="XMLDebatcher" src="XMLDebatcher.bpel">
<partnerLinkBindings>
<partnerLinkBinding name="debatch">
<property name="wsdlLocation">debatch.wsdl</property>
</partnerLinkBinding>
</partnerLinkBindings>
<activationAgents>
<activationAgent className="oracle.tip.adapter.fw.agent.jca.JCAActivationAgent" partnerLink="debatch">
<property name="batchNotificationHandler">bpel://default|BatchMgmtProcess</property>
<property name="portType">Read_ptt</property>
</activationAgent>
</activationAgents>
</BPELProcess>
</BPELSuitcase>
Above, the batch notification events are sent to a BPEL process called "BatchMgmtProcess", deployed in the "default" domain on the same BPEL server. The wsdl of this BatchMgmtProcess must define a number of messageTypes and operations. This is what it should look like (never mind the correlation stuff for now):
<definitions
name="BatchManagerInterface"
targetNamespace="http://xmlns.oracle.com/pcbpel/batching"
xmlns="http://schemas.xmlsoap.org/wsdl/"
xmlns:bpws="http://schemas.xmlsoap.org/ws/2003/03/business-process/"
xmlns:tns="http://xmlns.oracle.com/pcbpel/batching"
xmlns:plnk="http://schemas.xmlsoap.org/ws/2003/05/partner-link/"
xmlns:pns1="http://xmlns.oracle.com/BatchMgmtProcess/correlationset"
xmlns:batch="http://xmlns.oracle.com/pcbpel/batching/types"
>
<import namespace="http://xmlns.oracle.com/BatchMgmtProcess/correlationset" location="BatchMgmtProcess_Properties.wsdl"/>
<types>
<schema attributeFormDefault="qualified" elementFormDefault="qualified" targetNamespace="http://xmlns.oracle.com/pcbpel/batching/types"
xmlns:tns="http://xmlns.oracle.com/pcbpel/batching/types" xmlns="http://www.w3.org/2001/XMLSchema">
<element name="batchReadInitiateElement" type="tns:batchReadInitiateType"/>
<complexType name="batchReadInitiateType">
<sequence>
<element name="batchId" type="string"/>
<element name="batchMetaData" type="string"/>
<element name="batchDescription" type="string"/>
<element name="process" type="string"/>
<element name="domain" type="string"/>
</sequence>
</complexType>
<element name="batchReadCompleteElement" type="tns:batchReadCompleteType"/>
<complexType name="batchReadCompleteType">
<sequence>
<element name="batchId" type="string"/>
<element name="batchMetaData" type="string"/>
<element name="batchDescription" type="string"/>
<element name="batchExpectedSize" type="long"/>
<element name="process" type="string"/>
<element name="domain" type="string"/>
</sequence>
</complexType>
<element name="batchProcessCompleteElement" type="tns:batchProcessCompleteType"/>
<complexType name="batchProcessCompleteType">
<sequence>
<element name="batchId" type="string"/>
<element name="batchMetaData" type="string"/>
<element name="batchDescription" type="string"/>
<element name="batchFinalSize" type="long"/>
<element name="process" type="string"/>
<element name="domain" type="string"/>
</sequence>
</complexType>
<element name="batchReadFailureElement" type="tns:batchReadFailureType"/>
<complexType name="batchReadFailureType">
<sequence>
<element name="batchId" type="string"/>
<element name="batchMetaData" type="string"/>
<element name="batchDescription" type="string"/>
<element name="batchPartialSize" type="long"/>
<element name="process" type="string"/>
<element name="domain" type="string"/>
</sequence>
</complexType>
</schema>
</types>
<message name="batchReadInitiateMessage">
<part name="event" element="batch:batchReadInitiateElement"/>
</message>
<message name="batchReadCompleteMessage">
<part name="event" element="batch:batchReadCompleteElement"/>
</message>
<message name="batchProcessCompleteMessage">
<part name="event" element="batch:batchProcessCompleteElement"/>
</message>
<message name="batchReadFailureMessage">
<part name="event" element="batch:batchReadFailureElement"/>
</message>
<portType name="BatchManagerInterface">
<operation name="onBatchReadStart">
<input message="tns:batchReadInitiateMessage"/>
</operation>
<operation name="onBatchReadComplete">
<input message="tns:batchReadCompleteMessage"/>
</operation>
<operation name="onBatchProcessComplete">
<input message="tns:batchProcessCompleteMessage"/>
</operation>
<operation name="onBatchReadFailure">
<input message="tns:batchReadFailureMessage"/>
</operation>
</portType>
<plnk:partnerLinkType name="BatchManagerInterfacePartnerLinkType">
<plnk:role name="BatchManagerInterfaceRole">
<plnk:portType name="tns:BatchManagerInterface"/>
</plnk:role>
</plnk:partnerLinkType>
<bpws:propertyAlias propertyName="pns1:batchId" messageType="tns:batchReadCompleteMessage" part="event"
query="/batch:batchReadCompleteElement/batch:batchId"/>
<bpws:propertyAlias propertyName="pns1:batchId" messageType="tns:batchReadInitiateMessage" part="event"
query="/batch:batchReadInitiateElement/batch:batchId"/>
<bpws:propertyAlias propertyName="pns1:batchId" messageType="tns:batchReadFailureMessage" part="event"
query="/batch:batchReadFailureElement/batch:batchId"/>
</definitions>
The value of the batchId field is generated at runtime by the adapter, and can for example be used to initiate a correlationSet. You can then later on receive (e.g. using a Pick with correlation) a batchReadComplete or batchReadFailure message for the same file in the same monitoring instance. Here’s a fragment of what it could look like:
<correlationSets>
<correlationSet name="CorrelationSet_1" properties="ns3:batchId"/>
</correlationSets>
<sequence name="main">
<pick name="Pick_1" createInstance="yes">
<onMessage portType="ns1:BatchManagerInterface"
operation="onBatchReadStart"
variable="OnMessage_onBatchReadStart_InputVariable"
partnerLink="BatchManagerInterface">
<correlations>
<correlation initiate="yes" set="CorrelationSet_1"/>
</correlations>
10 comments:
Any idea how to implement the batch notification handler process in 11g?
Hi,
This post is very useful. Can you please tell me how to configure "batchNotificationHandler" in SOA 11g.
Thanks,
Vidya
I am really sorry to reply so late but you do using this java code.
Configuring Batch Notification Handler
The BatchNotificationHandler API is used in conjunction with the Oracle File and FTP Adapter inbound de-batchability. In a de-batching scenario, each file contains multiple messages, and some sort of bookkeeping is required for crash-recovery. This is facilitated by the BatchNotificationHandler API, which lets you receive notification from the pipeline whenever a batch begins, occurs, or ends. The following is the BatchNotificationHandler interface:
package oracle.tip.pc.services.pipeline;
/*
* Whenever the caller processes de-batchable files, each file can
* have multiple messages and this handler allows the user to plug in
* a notification mechanism into the pipeline.
*
* This is particularly useful in crash recovery situations
*/
public interface BatchNotificationHandler {
/*
* The Pipeline instance is set by the PipelineFactory when the
* BatchNotificationHandler instance is created
*/
public void setPipeline(Pipeline pipeline);
public Pipeline getPipeline();
/*
* Called when the BatchNotificationHandler is instantiated
*/
public void initialize();
/*
* Called by the adapter when a batch begins, the implementation must return
* a BatchContext instance with the following information:
* i) batchId: a unique id that will be returned every time onBatch is invoked by called
* ii)line/col/record/offset: for error recovery cases
*/
public BatchContext onBatchBegin();
/*
* Called by the adapter when a batch is submitted. The parameter holds the
* line/column/record/offset for the successful batch that is published.
* Here the implementation must save these in order to recover from crashes
*/
public void onBatch(BatchContext ctx);
/*
* Called by the adapter when a batch completes. This must be used to clean
* up
*/
public void onBatchCompletion(boolean success);
}
To use a pipeline with de-batching, you must configure the pipeline with a BatchNotificationHandler instance as follows:
valves.SimpleUnzipValve
valves.SimpleDecryptValve
How would you parse XML files with Oracle Service Bus when no BPEL engine is available?
You can parse xml file using OSB functions but you can not debatch it as you to make sure your debatching file is fully qualified xml file.
when i say you can not debatch i mean difficult to debatch based on elements.
Because if debatched file is not fully qualified then your function will error out.
Abhishek,
Thanks for your answer, but I am still struggling to find an example of processing an inbound XML files with OSB 11g. Went all over the docs. Is there a Flat File Adapter? The Oracle docs points to the flat file adapter for the old ESB, but not the new OSB. The OSB developer manual talk about JMS, FTP and other protocols, but not flat file.
Related question - what if the XML file has multiple records. How would I process those?
OSB 11g supoorts Oracle JCA Adapter for Files/FTP (Oracle File and FTP Adapters). You need to configure the adapter in JDev and then import the JCA and WSDL files into OSB configuration. Using JCA file, a business/proxy service can be generated.
To know more about configuration of JCA File/FTP adapter, please refer -
http://download.oracle.com/docs/cd/E14571_01/integration.1111/e10231/adptr_file.htm#BABCJJCD
To know that how to use JCA adapter with OSB, please refer -
http://blogs.oracle.com/middleware/2010/05/using_jca_adapter_with_osb_11113.html
On the above blog, James has used JCA dbadapter but for file adapter also, similar steps should be followed.
To make it more clear i can create a post and publish soon.
Also please note that you need to create JCA files using Jdev 1og not 11g.
I heard it gets error while deployment or creation of business service in OSB.
Please refer
http://abhishek-oracleaia-bpel-esb.blogspot.com/2010/08/osb-11g-oracle-file-adapter-debatching.html
Post a Comment