View Javadoc

1   package org.apache.torque.betwixt;
2   
3   /*
4    * Copyright 2001-2006 The Apache Software Foundation.
5    *
6    * Licensed under the Apache License, Version 2.0 (the "License")
7    * you may not use this file except in compliance with the License.
8    * You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  import java.beans.IntrospectionException;
20  import java.io.FileNotFoundException;
21  import java.io.IOException;
22  import java.io.Reader;
23  import java.sql.Connection;
24  import java.util.Hashtable;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Vector;
29  
30  import org.apache.commons.betwixt.io.BeanReader;
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.torque.Torque;
34  import org.apache.torque.TorqueException;
35  import org.apache.torque.om.BaseObject;
36  import org.apache.torque.util.Transaction;
37  import org.xml.sax.ErrorHandler;
38  import org.xml.sax.InputSource;
39  import org.xml.sax.SAXException;
40  import org.xml.sax.SAXParseException;
41  
42  
43  /***
44   * Top level controller for importing XML into Torque OM objects and 
45   * storing them into the database.  Uses the org.apache.common.betwixt 
46   * package to do the conversion to objects and registered RecordHandler
47   * classes to deal with managing the DB transactions.
48   * <P>
49   * 
50   * The input XML can use the dataset attributes, action and onError, to
51   * specify what effect these objects should have on the underlying data
52   * base.  E.g., action can be: add, addUpdate, update, or delete.
53   * onError can be: continue, stop, rollback.
54   * <p>
55   * 
56   * This class configures Betwixt using a supplied Resolver to locate
57   * the required mapping file and DTD file, to so that Betwixt can convert 
58   * the XML input stream into Torque (and possibly other) objects contained
59   * in a Dataset object.
60   * <P>
61   * 
62   * Then, based on the action specified in the dataset object, it dispatches 
63   * each of the imported objects to the matching RecordHandler method for
64   * record specific processing required to perform this action on  the data 
65   * in the DB. E.g., resolving id and foreign key issues, data validation, 
66   * and the like.
67   * <p>
68   * 
69   * It also handles top level error handling and transaction grouping to 
70   * support the specified onError attribute setting.
71   * <p>
72   * 
73   * Event listeners can be registered with this class so that import
74   * status (as specified in the ACTION_* static variables) can be reported
75   * to users and/or other reporting agents.
76   * <p>
77   * 
78   *  TODO: Support for having mix of records from different databases?
79   *  
80   * @author <a href="mailto:greg.monroe@dukece.com">Greg Monroe</a>
81   */
82  public class Importer extends XMLEventNotifier implements ErrorHandler 
83  {
84      static Log logger = LogFactory.getLog(Importer.class);
85      
86      /*** Continue on error flag value */
87      public static final int ON_ERROR_CONTINUE = 0;
88      
89      /*** Stop importing on error flag value */
90      public static final int ON_ERROR_STOP = 1;
91  
92      /*** Roll back on error flag value */
93      public static final int ON_ERROR_ROLLBACK = 2;
94      
95      /*** Add or update record action flag value */
96      public static final int ADD_UPDATE = 0;
97  
98      /*** Only add records action flag value */
99      public static final int ADD_ONLY = 1;
100 
101     /*** Only update records action flag value */
102     public static final int UPDATE_ONLY = 2;
103 
104     /*** Delete records action flag value */
105     public static final int DELETE = 3;
106 
107     private Hashtable handlers; 
108     
109     private List listeners = new Vector();
110     private int onError;
111     private int action;
112     private Connection connection;
113     private boolean errorParsing;
114     private int datasetRecordNumber;
115 
116     /***
117      * Handles parsing XML records into a Dataset object and then dispatching
118      * them to the correct record handlers to be processed.
119      *
120      */
121     public Importer() 
122     {
123         initHandlers();
124     }
125     
126     /***
127      * Initialize the default set of record handlers when class is
128      * created.  Handlers can be changed/added via registerHandler method.
129      */
130     public void initHandlers() 
131     {
132     }
133     
134     /***
135      * Create a BeanReader; parse the supplied XML input stream into a
136      * Dataset; and then process the Dataset.
137      * 
138      * @param xmlReader The XML input source.
139      * @param resolver A resolver object to locate needed information.
140      * @throws SAXException
141      * @throws IOException
142      * @throws Exception
143      */
144     public void importData( Reader xmlReader, Resolver resolver ) 
145                                                         throws Exception 
146     {
147         notifyListeners(0, IMPORT_START,
148                                     "Import of record set started.");
149         
150         BeanReader beanReader = createBeanReader( resolver );
151         Dataset dataset = parseXML( xmlReader, beanReader );
152         if ( dataset == null ) 
153         {
154             return;
155         } 
156         processDataset( dataset );
157     }
158     
159     /***
160      * Populate a Dataset object from the supplied XML input stream.
161      * 
162      * @param importReader
163      *            The XML input stream
164      * @param beanReader 
165      *            The betwixt BeanReader configured to parse the XML input.
166      * @throws IntrospectionException
167      * @throws IOException
168      * @throws SAXException
169      */
170     public Dataset parseXML ( Reader importReader, BeanReader beanReader )
171                 throws IntrospectionException, IOException, SAXException,
172                         Exception
173     {
174         Dataset results = null;
175         setErrorParsing(false);
176         try 
177         {
178             results = (Dataset) beanReader.parse(importReader);
179         } 
180         catch ( FileNotFoundException e ) 
181         {
182             String eMsg = "A required file was not found! This could be a " + 
183                           "problem reading the XML file or an incorrect (or " +
184                           "missing) DOCTYPE statement in the XML file"; 
185             notifyListeners(0,ACTION_ERROR, eMsg);
186             logger.warn("Exception Details",e);
187             return null;
188         }
189         catch ( Exception e ) 
190         {
191             String eMsg = "There was a problem importing the data.  " +
192                            "No data was imported.  The error given was:\n" + 
193                            e.getClass().getName()+": " + e.getMessage(); 
194             notifyListeners(0,ACTION_ERROR, eMsg);
195             logger.warn("Exception Details",e);
196             return null;
197         }
198         if ( isErrorParsing() ) 
199         {
200             String eMsg = "There was a problem parsing the import data.  " +
201                 "No data was imported.  See previous messages for details.";
202             notifyListeners(0,ACTION_ERROR, eMsg);
203             return null;
204         }
205         importReader.close();
206         return results;
207     }
208     
209     /***
210      * Create a Betwixt BeanReader to parse XML with. Validating is set
211      * to true and this class is the error handler.
212      * 
213      * @param resolver
214      *            A class used to locate DTD and mapping files needed to 
215      *            validate and convert the input XML into objects..  Cannot 
216      *            be null.
217      * @return A Betwixt BeanReader object with the mapping file registered and
218      *          syntax error checking turned on.
219      * @throws IntrospectionException
220      * @throws IOException
221      * @throws SAXException
222      */
223     public BeanReader createBeanReader( Resolver resolver  ) 
224                 throws IntrospectionException, IOException, SAXException 
225     {
226         InputSource mapSrc = null;
227         try 
228         { 
229             mapSrc = resolver.resolveBetwixtFile();
230         }
231         catch ( IOException e ) 
232         {
233             throw e;
234         }
235         catch ( Exception e ) 
236         {
237             throw new SAXException(
238                     "An error occured while resolving the betwixt map file",e);
239         }
240         
241         BeanReader beanReader = new BeanReader();
242         
243         beanReader.setEntityResolver(resolver);
244         try 
245         {
246             beanReader.registerMultiMapping(mapSrc);
247         }
248         catch (SAXParseException e )
249         {
250             String msg = "Betwixt mapping file, '"
251                          + resolver.getBewtixtFileName() + 
252                          "' could not be parsed!\n Parsing error was: \n" + 
253                          "Error at line " +e.getLineNumber() + 
254                          " column "+e.getColumnNumber() +": " + e.getMessage();
255             logger.error(msg, e);
256             throw new SAXException(msg, e);
257         }
258         
259         beanReader.setValidating( true );
260         beanReader.setErrorHandler( this );
261         
262         return beanReader;
263     }
264     
265     /***
266      * The main driver for importing data into a Torque managed database from
267      * XML.
268      * Calls the appropriate RecordHandler implimentation method for each item 
269      * in the dataset based on class (or superclass) and the options set in 
270      * the dataset.  Handles notifying registered listeners of import status.
271      * Manages DB transaction logic (connection, commit, rollback, etc).
272      * 
273      * @param records The dataset of records to process.
274      * @throws Exception
275      */
276     public void processDataset( Dataset records ) 
277                                         throws Exception 
278     {
279         if ( records == null ) 
280         {
281             throw new IllegalArgumentException(
282                                     "Importer.processDataSet(Dataset): " + 
283                                     "Dataset argument cannot be null.");
284         }
285         initOptions( records );
286          
287         Iterator items = records.iterator();
288         boolean first = true;
289         int recNum = 0;
290         setDatasetRecordNumber(recNum);
291         while ( items.hasNext() ) 
292         {
293             Object record = items.next();
294             recNum = getDatasetRecordNumber()+1;
295             setDatasetRecordNumber(recNum);
296             
297             if ( first ) 
298             {
299                 startTransaction( record );
300                 first = false;
301             }
302             try 
303             {
304                 RecordHandler handler = findRecordHandler(record);
305                 if ( handler != null ) 
306                 {
307                     // Note:  Some listeners parse this message by the
308                     // single quotes around parameters. So be carefull
309                     // in changing format.
310                     notifyListeners( recNum, ACTION_TYPE,
311                                     "Processing record type: '" + 
312                                     simpleClassName(record.getClass()) + 
313                                     "' with handler: '" + 
314                                     simpleClassName(handler.getClass()) + "'");
315                     
316                     if (  getAction() != DELETE ) 
317                     {
318                         handler.addUpdateRecord(record, this );
319                     }
320                     else 
321                     {
322                         handler.deleteRecord( record, this );
323                     }
324                 } 
325                 else 
326                 {
327                     String eMsg = "No handler registered for record type of " 
328                                   +  record.getClass().getName();
329                     notifyListeners( recNum, ACTION_ERROR, eMsg );
330                     throw new IllegalStateException(eMsg);
331                 }
332             } 
333             catch ( Exception e ) 
334             {
335                 String eMsg = "An error occured while processing a dataset. " +
336                 "The reason given was:\n" + e.getMessage();
337                 
338                 // These exceptions indicate the handler has already 
339                 // notified listeners. 
340                 if ( ! (e.getClass().equals(IllegalStateException.class) ||
341                      e.getClass().equals(IllegalArgumentException.class )) ) 
342                 {
343                     notifyListeners(recNum, ACTION_ERROR,eMsg);
344                     logger.warn("Exception Details",e);
345                 }
346                 if ( getOnError() == ON_ERROR_CONTINUE ) 
347                 {
348                     eMsg = "Dataset processing continuing after error.";
349                     notifyListeners(recNum, ACTION_INFO,eMsg);
350                     continue;
351                 }    
352                 if ( getOnError() == ON_ERROR_ROLLBACK ) 
353                 {
354                     rollbackTransaction();
355                 }
356                 eMsg = "Dataset processing stopped by error.";
357                 notifyListeners(recNum, IMPORT_END, eMsg);
358                 break;
359             }
360         }
361         try 
362         {
363             Connection con = getConnection();
364             Transaction.commit( con );
365             notifyListeners(recNum, IMPORT_END, 
366                     "Record set changes have been committed.");
367             
368         } 
369         catch ( IllegalStateException e ) 
370         {
371             // connection might be null at this state.
372         } 
373         finally 
374         {
375             if ( connection != null ) 
376             {
377                 if ( ! connection.isClosed() ) 
378                 {
379                     connection.close();
380                 }
381                 connection = null;
382             }
383         }
384     }
385     
386     /***
387      * Parses the text version of the Dataset action and onError attributes
388      * into option flag values.  Text options are case insensitive.
389      * 
390      * @param records The dataset with the options
391      * @throws IllegalArgumentException If an unknown value is found.
392      */
393     protected void initOptions( Dataset records ) 
394                             throws IllegalArgumentException, Exception
395     {
396         
397         String action =  records.getAction();
398         if ( action == null ) 
399         {
400             setAction ( ADD_UPDATE );
401         } 
402         else 
403         {
404             action = action.toLowerCase();
405             if ( action.equals("addupdate")) 
406             {
407                 setAction( ADD_UPDATE );
408             } 
409             else if ( action.equals("add"))  
410             {
411                 setAction( ADD_ONLY );
412             } 
413             else if ( action.equals("update")) 
414             {
415                 setAction( UPDATE_ONLY );
416             } 
417             else if ( action.equals("delete")) 
418             {
419                 setAction( DELETE );
420             } 
421             else 
422             {
423                 String eMsg = 
424                     "Invalid Dataset action attribute value found on import!" +
425                     "Value='" + action + "'";
426                 notifyListeners(getDatasetRecordNumber(),ACTION_ERROR, eMsg );
427                 throw new IllegalArgumentException(eMsg);
428             }
429         }
430         String onError = records.getOnError();
431         if ( onError == null ) 
432         {
433             setOnError(ON_ERROR_CONTINUE );
434         } 
435         else 
436         {
437             onError = onError.toLowerCase();
438             if ( onError.equals("continue")) 
439             {
440                 setOnError ( ON_ERROR_CONTINUE ); 
441             } 
442             else if ( onError.equals("stop")) 
443             {
444                 setOnError ( ON_ERROR_STOP ); 
445             } 
446             else if ( onError.equals("rollback")) 
447             {
448                 setOnError ( ON_ERROR_ROLLBACK ); 
449             } 
450             else 
451             {
452                 String eMsg = 
453                   "Invalid Dataset 'onError' attribute value found on import!"+
454                   "Value='"+onError+"'";
455                 notifyListeners( getDatasetRecordNumber(), ACTION_ERROR, eMsg );
456                 throw new IllegalArgumentException(eMsg);
457             }
458         }
459         logger.debug("action="+action+" getAction()="+getAction());
460         logger.debug("onError="+onError+" getOnError()="+getOnError());
461     }
462     
463     /***
464      * Start a transaction grouping.
465      * 
466      * @param record
467      * @throws TorqueException
468      */
469     protected void startTransaction( Object record ) 
470                                                 throws TorqueException 
471     {
472         String dbName = null;
473         if ( record.getClass() == BaseObject.class ) 
474         {
475             dbName = ((BaseObject) record).getTableMap().
476                                                 getDatabaseMap().getName();
477         }
478         else 
479         {
480             dbName = Torque.getDefaultDB();
481         }
482         setConnection(Transaction.begin(dbName));
483     }
484     
485     /***
486      * Search the registered record handlers for one that matches
487      * the class or superclass of the supplied record.
488      * 
489      * @param record The record to process.
490      * @return A clone of the registered record handler object to process 
491      *         this object type or null if none found.
492      * @throws CloneNotSupportedException 
493      */
494     protected RecordHandler findRecordHandler( Object record ) 
495                                             throws CloneNotSupportedException 
496     {
497         Class rClass = record.getClass();
498         RecordHandler rHandler = null;
499         while ( rClass != null  ) 
500         {
501             rHandler = (RecordHandler) getHandlers().get(rClass);
502             if ( rHandler != null ) 
503             {
504                 return (RecordHandler) rHandler.clone();
505             }
506             rClass = rClass.getSuperclass();
507         }
508         return null;
509     }
510     
511     /***
512      * Attempt to roll back all changes made prior to an error occuring.
513      */
514     protected void rollbackTransaction( ) throws Exception 
515     {
516         
517         notifyListeners( getDatasetRecordNumber(), ACTION_INFO,
518                             "Rolling back data already processed." );
519         
520         try 
521         {
522             Connection con = getConnection();
523             Transaction.rollback(con);
524             setConnection(null);
525             notifyListeners( getDatasetRecordNumber(), ACTION_INFO,
526                                             "The rollback was successful." );
527         } 
528         catch (TorqueException e) 
529         {
530             String eMsg = "An error occured during rollback.";
531             notifyListeners( getDatasetRecordNumber(), ACTION_ERROR, eMsg );
532         }
533     }
534     
535     /***
536      * Get the connection to save information with.  
537      * Note: To support rollbacks, all records are stored via a single 
538      * transaction wrapped connection. 
539      * 
540      * @return The transaction connection object. 
541      * @throws IllegalStateException if has not been initialized
542      */
543     public Connection getConnection( ) 
544                           throws IllegalStateException  
545     {
546         if ( this.connection == null ) 
547         {
548             String eMsg = 
549                 "Attempted to get the db connection before it was initialized!";
550             throw new IllegalStateException (eMsg); 
551         }
552         return connection;
553     }
554     
555     /***
556      * Set the database connection to use.
557      * 
558      * @param connection The connection to set.
559      */
560     protected void setConnection(Connection connection) 
561     {
562         this.connection = connection;
563     }
564     
565     /***
566      * Calls all registered listeners to let them know an action occured. 
567      * Notification will occure in the order they were registered.
568      * 
569      * @param action 
570      * @param info
571      */
572     public void notifyListeners(int recordNum, String action, String info )
573                                                             throws Exception
574     {
575         logger.info( "Record Number="+recordNum+" "+action+": "+info );
576         synchronized ( listeners ) 
577         {
578             Iterator iList = listeners.iterator();
579             while ( iList.hasNext() ) 
580             {
581                 XMLEventListener l = (XMLEventListener) iList.next();
582                 l.actionPerformed(recordNum, action, info );
583             }
584         }
585     }
586     
587     /***
588      * Register a listener that will be notified of export actions. 
589      * 
590      * @param listener
591      */
592     public synchronized void registerListener( XMLEventListener listener ) 
593     {
594         if ( ! getListeners().contains(listener)) 
595         {
596             getListeners().add(listener);
597         }
598     }
599     
600     /***
601      * Remove a listener. 
602      * 
603      * @param listener
604      */
605     public synchronized void deregisterListener( XMLEventListener listener ) 
606     {
607         getListeners().remove(listener);
608     }
609     
610     /***
611      * Gets the registered listeners.
612      * 
613      * @return A list of registered listeners.
614      */
615     protected synchronized List getListeners() 
616     {
617         if ( listeners == null ) 
618         {
619             listeners = new Vector();
620         }
621         return listeners;
622     }
623     
624     /***
625      * Register or modify a recordHandler to be used with the specifed
626      * class or subclass.
627      * 
628      * @param recordType
629      * @param handler
630      */
631     public synchronized void  registerHandler( Class recordType, 
632                                                RecordHandler handler )
633     {
634         if ( handlers == null ) 
635         {
636             handlers = new Hashtable();
637         }
638         this.handlers.put(recordType, handler );
639     }
640     
641 
642     /***
643      * Get the record handlers hash table.
644      * 
645      * @return Returns the handlers.
646      */
647     protected synchronized Map getHandlers() 
648     {
649         return handlers;
650     }
651 
652     /***
653      * Set a new record handler hash table.
654      * 
655      * @param handlers The handlers to set.
656      */
657     protected synchronized void setHandlers(Hashtable handlers) 
658     {
659         this.handlers = handlers;
660     }
661 
662     /***
663      * @return Returns the action.
664      */
665     public int getAction() 
666     {
667         return action;
668     }
669 
670     /***
671      * @param action The action to set.
672      */
673     protected void setAction(int action) 
674     {
675         this.action = action;
676     }
677 
678     /***
679      * @return Returns the onError.
680      */
681     public int getOnError() 
682     {
683         return onError;
684     }
685 
686     /***
687      * @param onError The onError to set.
688      */
689     protected void setOnError(int onError) 
690     {
691         this.onError = onError;
692     }
693 
694     public void warning(SAXParseException e) throws SAXException 
695     {
696         try 
697         {
698             notifyListeners( 0, ACTION_ERROR, 
699                 "XML Parser Warning: " + e.getMessage() + 
700                 "\n\toccured at line " + e.getLineNumber() + 
701                 ", column " + e.getColumnNumber() + " in entity " +
702                 e.getSystemId());
703         } 
704         catch (Exception e1 ) 
705         {
706             String eMsg = "Notify listener threw Exception: " + 
707                                 e.getMessage();
708             logger.error(eMsg, e1 );
709             throw new RuntimeException(eMsg, e1);
710         }
711         setErrorParsing(true);
712     }
713 
714     public void error(SAXParseException e) throws SAXException 
715     {
716         try 
717         {
718             notifyListeners( 0, ACTION_ERROR, "XML Parser Error: " + 
719                     e.getMessage() + "\n\toccured at line " + 
720                     e.getLineNumber() + ", column " + e.getColumnNumber() + 
721                     " in entity " + e.getSystemId());
722         } 
723         catch (Exception e1 ) 
724         {
725             String eMsg = "Notify listener threw Exception: " + 
726                                 e.getMessage();
727             logger.error(eMsg, e1 );
728             throw new RuntimeException(eMsg, e1);
729         }
730         setErrorParsing(true);
731     }
732 
733     public void fatalError(SAXParseException e) throws SAXException 
734     {
735         try 
736         {
737             notifyListeners( 0, ACTION_ERROR, "XML Parser Fatal Error: " + 
738                 e.getMessage() + "\n\toccured at line " + e.getLineNumber() + 
739                 ", column " + e.getColumnNumber() + " in entity " +
740                 e.getSystemId());
741         } 
742         catch (Exception e1 ) 
743         {
744             String eMsg = "Notify listener threw Exception: " + 
745                                 e.getMessage();
746             logger.error(eMsg, e1 );
747             throw new RuntimeException(eMsg, e1);
748         }
749         throw e;
750     }
751 
752     /***
753      * @return Returns the errorParsing.
754      */
755     protected boolean isErrorParsing() 
756     {
757         return errorParsing;
758     }
759 
760     /***
761      * @param errorParsing The errorParsing to set.
762      */
763     protected void setErrorParsing(boolean errorParsing) 
764     {
765         this.errorParsing = errorParsing;
766     }
767     
768     /***
769      * Get the name of a class without it's package.
770      * 
771      * @param c A class object to find the simple name of.
772      * @return The simple class name.
773      */
774     public String simpleClassName( Class c ) 
775     {
776         String qualifiedName = c.getName();
777         return qualifiedName.substring(qualifiedName.lastIndexOf('.')+1);
778     }
779 
780     /***
781      * Get the (1 based) number of the current record object being 
782      * processed.  Note: Record 0 = setup/xml parsing phase.
783      * 
784      * @return Returns the datasetRecordNumber.
785      */
786     public int getDatasetRecordNumber() 
787     {
788         return datasetRecordNumber;
789     }
790 
791     /***
792      * Sets the (1 based) number of the current record object being 
793      * processed.
794      * 
795      * @param datasetRecordNumber The datasetRecordNumber to set.
796      */
797     public void setDatasetRecordNumber(int datasetRecordNumber) 
798     {
799         this.datasetRecordNumber = datasetRecordNumber;
800     }
801 }