1 package org.apache.torque.betwixt;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 import java.beans.IntrospectionException;
20 import java.io.FileNotFoundException;
21 import java.io.IOException;
22 import java.io.Reader;
23 import java.sql.Connection;
24 import java.util.Hashtable;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Vector;
29
30 import org.apache.commons.betwixt.io.BeanReader;
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.torque.Torque;
34 import org.apache.torque.TorqueException;
35 import org.apache.torque.om.BaseObject;
36 import org.apache.torque.util.Transaction;
37 import org.xml.sax.ErrorHandler;
38 import org.xml.sax.InputSource;
39 import org.xml.sax.SAXException;
40 import org.xml.sax.SAXParseException;
41
42
43 /***
44 * Top level controller for importing XML into Torque OM objects and
45 * storing them into the database. Uses the org.apache.common.betwixt
46 * package to do the conversion to objects and registered RecordHandler
47 * classes to deal with managing the DB transactions.
48 * <P>
49 *
50 * The input XML can use the dataset attributes, action and onError, to
51 * specify what effect these objects should have on the underlying data
52 * base. E.g., action can be: add, addUpdate, update, or delete.
53 * onError can be: continue, stop, rollback.
54 * <p>
55 *
56 * This class configures Betwixt using a supplied Resolver to locate
57 * the required mapping file and DTD file, to so that Betwixt can convert
58 * the XML input stream into Torque (and possibly other) objects contained
59 * in a Dataset object.
60 * <P>
61 *
62 * Then, based on the action specified in the dataset object, it dispatches
63 * each of the imported objects to the matching RecordHandler method for
64 * record specific processing required to perform this action on the data
65 * in the DB. E.g., resolving id and foreign key issues, data validation,
66 * and the like.
67 * <p>
68 *
69 * It also handles top level error handling and transaction grouping to
70 * support the specified onError attribute setting.
71 * <p>
72 *
73 * Event listeners can be registered with this class so that import
74 * status (as specified in the ACTION_* static variables) can be reported
75 * to users and/or other reporting agents.
76 * <p>
77 *
78 * TODO: Support for having mix of records from different databases?
79 *
80 * @author <a href="mailto:greg.monroe@dukece.com">Greg Monroe</a>
81 */
82 public class Importer extends XMLEventNotifier implements ErrorHandler
83 {
84 static Log logger = LogFactory.getLog(Importer.class);
85
86 /*** Continue on error flag value */
87 public static final int ON_ERROR_CONTINUE = 0;
88
89 /*** Stop importing on error flag value */
90 public static final int ON_ERROR_STOP = 1;
91
92 /*** Roll back on error flag value */
93 public static final int ON_ERROR_ROLLBACK = 2;
94
95 /*** Add or update record action flag value */
96 public static final int ADD_UPDATE = 0;
97
98 /*** Only add records action flag value */
99 public static final int ADD_ONLY = 1;
100
101 /*** Only update records action flag value */
102 public static final int UPDATE_ONLY = 2;
103
104 /*** Delete records action flag value */
105 public static final int DELETE = 3;
106
107 private Hashtable handlers;
108
109 private List listeners = new Vector();
110 private int onError;
111 private int action;
112 private Connection connection;
113 private boolean errorParsing;
114 private int datasetRecordNumber;
115
116 /***
117 * Handles parsing XML records into a Dataset object and then dispatching
118 * them to the correct record handlers to be processed.
119 *
120 */
121 public Importer()
122 {
123 initHandlers();
124 }
125
126 /***
127 * Initialize the default set of record handlers when class is
128 * created. Handlers can be changed/added via registerHandler method.
129 */
130 public void initHandlers()
131 {
132 }
133
134 /***
135 * Create a BeanReader; parse the supplied XML input stream into a
136 * Dataset; and then process the Dataset.
137 *
138 * @param xmlReader The XML input source.
139 * @param resolver A resolver object to locate needed information.
140 * @throws SAXException
141 * @throws IOException
142 * @throws Exception
143 */
144 public void importData( Reader xmlReader, Resolver resolver )
145 throws Exception
146 {
147 notifyListeners(0, IMPORT_START,
148 "Import of record set started.");
149
150 BeanReader beanReader = createBeanReader( resolver );
151 Dataset dataset = parseXML( xmlReader, beanReader );
152 if ( dataset == null )
153 {
154 return;
155 }
156 processDataset( dataset );
157 }
158
159 /***
160 * Populate a Dataset object from the supplied XML input stream.
161 *
162 * @param importReader
163 * The XML input stream
164 * @param beanReader
165 * The betwixt BeanReader configured to parse the XML input.
166 * @throws IntrospectionException
167 * @throws IOException
168 * @throws SAXException
169 */
170 public Dataset parseXML ( Reader importReader, BeanReader beanReader )
171 throws IntrospectionException, IOException, SAXException,
172 Exception
173 {
174 Dataset results = null;
175 setErrorParsing(false);
176 try
177 {
178 results = (Dataset) beanReader.parse(importReader);
179 }
180 catch ( FileNotFoundException e )
181 {
182 String eMsg = "A required file was not found! This could be a " +
183 "problem reading the XML file or an incorrect (or " +
184 "missing) DOCTYPE statement in the XML file";
185 notifyListeners(0,ACTION_ERROR, eMsg);
186 logger.warn("Exception Details",e);
187 return null;
188 }
189 catch ( Exception e )
190 {
191 String eMsg = "There was a problem importing the data. " +
192 "No data was imported. The error given was:\n" +
193 e.getClass().getName()+": " + e.getMessage();
194 notifyListeners(0,ACTION_ERROR, eMsg);
195 logger.warn("Exception Details",e);
196 return null;
197 }
198 if ( isErrorParsing() )
199 {
200 String eMsg = "There was a problem parsing the import data. " +
201 "No data was imported. See previous messages for details.";
202 notifyListeners(0,ACTION_ERROR, eMsg);
203 return null;
204 }
205 importReader.close();
206 return results;
207 }
208
209 /***
210 * Create a Betwixt BeanReader to parse XML with. Validating is set
211 * to true and this class is the error handler.
212 *
213 * @param resolver
214 * A class used to locate DTD and mapping files needed to
215 * validate and convert the input XML into objects.. Cannot
216 * be null.
217 * @return A Betwixt BeanReader object with the mapping file registered and
218 * syntax error checking turned on.
219 * @throws IntrospectionException
220 * @throws IOException
221 * @throws SAXException
222 */
223 public BeanReader createBeanReader( Resolver resolver )
224 throws IntrospectionException, IOException, SAXException
225 {
226 InputSource mapSrc = null;
227 try
228 {
229 mapSrc = resolver.resolveBetwixtFile();
230 }
231 catch ( IOException e )
232 {
233 throw e;
234 }
235 catch ( Exception e )
236 {
237 throw new SAXException(
238 "An error occured while resolving the betwixt map file",e);
239 }
240
241 BeanReader beanReader = new BeanReader();
242
243 beanReader.setEntityResolver(resolver);
244 try
245 {
246 beanReader.registerMultiMapping(mapSrc);
247 }
248 catch (SAXParseException e )
249 {
250 String msg = "Betwixt mapping file, '"
251 + resolver.getBewtixtFileName() +
252 "' could not be parsed!\n Parsing error was: \n" +
253 "Error at line " +e.getLineNumber() +
254 " column "+e.getColumnNumber() +": " + e.getMessage();
255 logger.error(msg, e);
256 throw new SAXException(msg, e);
257 }
258
259 beanReader.setValidating( true );
260 beanReader.setErrorHandler( this );
261
262 return beanReader;
263 }
264
265 /***
266 * The main driver for importing data into a Torque managed database from
267 * XML.
268 * Calls the appropriate RecordHandler implimentation method for each item
269 * in the dataset based on class (or superclass) and the options set in
270 * the dataset. Handles notifying registered listeners of import status.
271 * Manages DB transaction logic (connection, commit, rollback, etc).
272 *
273 * @param records The dataset of records to process.
274 * @throws Exception
275 */
276 public void processDataset( Dataset records )
277 throws Exception
278 {
279 if ( records == null )
280 {
281 throw new IllegalArgumentException(
282 "Importer.processDataSet(Dataset): " +
283 "Dataset argument cannot be null.");
284 }
285 initOptions( records );
286
287 Iterator items = records.iterator();
288 boolean first = true;
289 int recNum = 0;
290 setDatasetRecordNumber(recNum);
291 while ( items.hasNext() )
292 {
293 Object record = items.next();
294 recNum = getDatasetRecordNumber()+1;
295 setDatasetRecordNumber(recNum);
296
297 if ( first )
298 {
299 startTransaction( record );
300 first = false;
301 }
302 try
303 {
304 RecordHandler handler = findRecordHandler(record);
305 if ( handler != null )
306 {
307
308
309
310 notifyListeners( recNum, ACTION_TYPE,
311 "Processing record type: '" +
312 simpleClassName(record.getClass()) +
313 "' with handler: '" +
314 simpleClassName(handler.getClass()) + "'");
315
316 if ( getAction() != DELETE )
317 {
318 handler.addUpdateRecord(record, this );
319 }
320 else
321 {
322 handler.deleteRecord( record, this );
323 }
324 }
325 else
326 {
327 String eMsg = "No handler registered for record type of "
328 + record.getClass().getName();
329 notifyListeners( recNum, ACTION_ERROR, eMsg );
330 throw new IllegalStateException(eMsg);
331 }
332 }
333 catch ( Exception e )
334 {
335 String eMsg = "An error occured while processing a dataset. " +
336 "The reason given was:\n" + e.getMessage();
337
338
339
340 if ( ! (e.getClass().equals(IllegalStateException.class) ||
341 e.getClass().equals(IllegalArgumentException.class )) )
342 {
343 notifyListeners(recNum, ACTION_ERROR,eMsg);
344 logger.warn("Exception Details",e);
345 }
346 if ( getOnError() == ON_ERROR_CONTINUE )
347 {
348 eMsg = "Dataset processing continuing after error.";
349 notifyListeners(recNum, ACTION_INFO,eMsg);
350 continue;
351 }
352 if ( getOnError() == ON_ERROR_ROLLBACK )
353 {
354 rollbackTransaction();
355 }
356 eMsg = "Dataset processing stopped by error.";
357 notifyListeners(recNum, IMPORT_END, eMsg);
358 break;
359 }
360 }
361 try
362 {
363 Connection con = getConnection();
364 Transaction.commit( con );
365 notifyListeners(recNum, IMPORT_END,
366 "Record set changes have been committed.");
367
368 }
369 catch ( IllegalStateException e )
370 {
371
372 }
373 finally
374 {
375 if ( connection != null )
376 {
377 if ( ! connection.isClosed() )
378 {
379 connection.close();
380 }
381 connection = null;
382 }
383 }
384 }
385
386 /***
387 * Parses the text version of the Dataset action and onError attributes
388 * into option flag values. Text options are case insensitive.
389 *
390 * @param records The dataset with the options
391 * @throws IllegalArgumentException If an unknown value is found.
392 */
393 protected void initOptions( Dataset records )
394 throws IllegalArgumentException, Exception
395 {
396
397 String action = records.getAction();
398 if ( action == null )
399 {
400 setAction ( ADD_UPDATE );
401 }
402 else
403 {
404 action = action.toLowerCase();
405 if ( action.equals("addupdate"))
406 {
407 setAction( ADD_UPDATE );
408 }
409 else if ( action.equals("add"))
410 {
411 setAction( ADD_ONLY );
412 }
413 else if ( action.equals("update"))
414 {
415 setAction( UPDATE_ONLY );
416 }
417 else if ( action.equals("delete"))
418 {
419 setAction( DELETE );
420 }
421 else
422 {
423 String eMsg =
424 "Invalid Dataset action attribute value found on import!" +
425 "Value='" + action + "'";
426 notifyListeners(getDatasetRecordNumber(),ACTION_ERROR, eMsg );
427 throw new IllegalArgumentException(eMsg);
428 }
429 }
430 String onError = records.getOnError();
431 if ( onError == null )
432 {
433 setOnError(ON_ERROR_CONTINUE );
434 }
435 else
436 {
437 onError = onError.toLowerCase();
438 if ( onError.equals("continue"))
439 {
440 setOnError ( ON_ERROR_CONTINUE );
441 }
442 else if ( onError.equals("stop"))
443 {
444 setOnError ( ON_ERROR_STOP );
445 }
446 else if ( onError.equals("rollback"))
447 {
448 setOnError ( ON_ERROR_ROLLBACK );
449 }
450 else
451 {
452 String eMsg =
453 "Invalid Dataset 'onError' attribute value found on import!"+
454 "Value='"+onError+"'";
455 notifyListeners( getDatasetRecordNumber(), ACTION_ERROR, eMsg );
456 throw new IllegalArgumentException(eMsg);
457 }
458 }
459 logger.debug("action="+action+" getAction()="+getAction());
460 logger.debug("onError="+onError+" getOnError()="+getOnError());
461 }
462
463 /***
464 * Start a transaction grouping.
465 *
466 * @param record
467 * @throws TorqueException
468 */
469 protected void startTransaction( Object record )
470 throws TorqueException
471 {
472 String dbName = null;
473 if ( record.getClass() == BaseObject.class )
474 {
475 dbName = ((BaseObject) record).getTableMap().
476 getDatabaseMap().getName();
477 }
478 else
479 {
480 dbName = Torque.getDefaultDB();
481 }
482 setConnection(Transaction.begin(dbName));
483 }
484
485 /***
486 * Search the registered record handlers for one that matches
487 * the class or superclass of the supplied record.
488 *
489 * @param record The record to process.
490 * @return A clone of the registered record handler object to process
491 * this object type or null if none found.
492 * @throws CloneNotSupportedException
493 */
494 protected RecordHandler findRecordHandler( Object record )
495 throws CloneNotSupportedException
496 {
497 Class rClass = record.getClass();
498 RecordHandler rHandler = null;
499 while ( rClass != null )
500 {
501 rHandler = (RecordHandler) getHandlers().get(rClass);
502 if ( rHandler != null )
503 {
504 return (RecordHandler) rHandler.clone();
505 }
506 rClass = rClass.getSuperclass();
507 }
508 return null;
509 }
510
511 /***
512 * Attempt to roll back all changes made prior to an error occuring.
513 */
514 protected void rollbackTransaction( ) throws Exception
515 {
516
517 notifyListeners( getDatasetRecordNumber(), ACTION_INFO,
518 "Rolling back data already processed." );
519
520 try
521 {
522 Connection con = getConnection();
523 Transaction.rollback(con);
524 setConnection(null);
525 notifyListeners( getDatasetRecordNumber(), ACTION_INFO,
526 "The rollback was successful." );
527 }
528 catch (TorqueException e)
529 {
530 String eMsg = "An error occured during rollback.";
531 notifyListeners( getDatasetRecordNumber(), ACTION_ERROR, eMsg );
532 }
533 }
534
535 /***
536 * Get the connection to save information with.
537 * Note: To support rollbacks, all records are stored via a single
538 * transaction wrapped connection.
539 *
540 * @return The transaction connection object.
541 * @throws IllegalStateException if has not been initialized
542 */
543 public Connection getConnection( )
544 throws IllegalStateException
545 {
546 if ( this.connection == null )
547 {
548 String eMsg =
549 "Attempted to get the db connection before it was initialized!";
550 throw new IllegalStateException (eMsg);
551 }
552 return connection;
553 }
554
555 /***
556 * Set the database connection to use.
557 *
558 * @param connection The connection to set.
559 */
560 protected void setConnection(Connection connection)
561 {
562 this.connection = connection;
563 }
564
565 /***
566 * Calls all registered listeners to let them know an action occured.
567 * Notification will occure in the order they were registered.
568 *
569 * @param action
570 * @param info
571 */
572 public void notifyListeners(int recordNum, String action, String info )
573 throws Exception
574 {
575 logger.info( "Record Number="+recordNum+" "+action+": "+info );
576 synchronized ( listeners )
577 {
578 Iterator iList = listeners.iterator();
579 while ( iList.hasNext() )
580 {
581 XMLEventListener l = (XMLEventListener) iList.next();
582 l.actionPerformed(recordNum, action, info );
583 }
584 }
585 }
586
587 /***
588 * Register a listener that will be notified of export actions.
589 *
590 * @param listener
591 */
592 public synchronized void registerListener( XMLEventListener listener )
593 {
594 if ( ! getListeners().contains(listener))
595 {
596 getListeners().add(listener);
597 }
598 }
599
600 /***
601 * Remove a listener.
602 *
603 * @param listener
604 */
605 public synchronized void deregisterListener( XMLEventListener listener )
606 {
607 getListeners().remove(listener);
608 }
609
610 /***
611 * Gets the registered listeners.
612 *
613 * @return A list of registered listeners.
614 */
615 protected synchronized List getListeners()
616 {
617 if ( listeners == null )
618 {
619 listeners = new Vector();
620 }
621 return listeners;
622 }
623
624 /***
625 * Register or modify a recordHandler to be used with the specifed
626 * class or subclass.
627 *
628 * @param recordType
629 * @param handler
630 */
631 public synchronized void registerHandler( Class recordType,
632 RecordHandler handler )
633 {
634 if ( handlers == null )
635 {
636 handlers = new Hashtable();
637 }
638 this.handlers.put(recordType, handler );
639 }
640
641
642 /***
643 * Get the record handlers hash table.
644 *
645 * @return Returns the handlers.
646 */
647 protected synchronized Map getHandlers()
648 {
649 return handlers;
650 }
651
652 /***
653 * Set a new record handler hash table.
654 *
655 * @param handlers The handlers to set.
656 */
657 protected synchronized void setHandlers(Hashtable handlers)
658 {
659 this.handlers = handlers;
660 }
661
662 /***
663 * @return Returns the action.
664 */
665 public int getAction()
666 {
667 return action;
668 }
669
670 /***
671 * @param action The action to set.
672 */
673 protected void setAction(int action)
674 {
675 this.action = action;
676 }
677
678 /***
679 * @return Returns the onError.
680 */
681 public int getOnError()
682 {
683 return onError;
684 }
685
686 /***
687 * @param onError The onError to set.
688 */
689 protected void setOnError(int onError)
690 {
691 this.onError = onError;
692 }
693
694 public void warning(SAXParseException e) throws SAXException
695 {
696 try
697 {
698 notifyListeners( 0, ACTION_ERROR,
699 "XML Parser Warning: " + e.getMessage() +
700 "\n\toccured at line " + e.getLineNumber() +
701 ", column " + e.getColumnNumber() + " in entity " +
702 e.getSystemId());
703 }
704 catch (Exception e1 )
705 {
706 String eMsg = "Notify listener threw Exception: " +
707 e.getMessage();
708 logger.error(eMsg, e1 );
709 throw new RuntimeException(eMsg, e1);
710 }
711 setErrorParsing(true);
712 }
713
714 public void error(SAXParseException e) throws SAXException
715 {
716 try
717 {
718 notifyListeners( 0, ACTION_ERROR, "XML Parser Error: " +
719 e.getMessage() + "\n\toccured at line " +
720 e.getLineNumber() + ", column " + e.getColumnNumber() +
721 " in entity " + e.getSystemId());
722 }
723 catch (Exception e1 )
724 {
725 String eMsg = "Notify listener threw Exception: " +
726 e.getMessage();
727 logger.error(eMsg, e1 );
728 throw new RuntimeException(eMsg, e1);
729 }
730 setErrorParsing(true);
731 }
732
733 public void fatalError(SAXParseException e) throws SAXException
734 {
735 try
736 {
737 notifyListeners( 0, ACTION_ERROR, "XML Parser Fatal Error: " +
738 e.getMessage() + "\n\toccured at line " + e.getLineNumber() +
739 ", column " + e.getColumnNumber() + " in entity " +
740 e.getSystemId());
741 }
742 catch (Exception e1 )
743 {
744 String eMsg = "Notify listener threw Exception: " +
745 e.getMessage();
746 logger.error(eMsg, e1 );
747 throw new RuntimeException(eMsg, e1);
748 }
749 throw e;
750 }
751
752 /***
753 * @return Returns the errorParsing.
754 */
755 protected boolean isErrorParsing()
756 {
757 return errorParsing;
758 }
759
760 /***
761 * @param errorParsing The errorParsing to set.
762 */
763 protected void setErrorParsing(boolean errorParsing)
764 {
765 this.errorParsing = errorParsing;
766 }
767
768 /***
769 * Get the name of a class without it's package.
770 *
771 * @param c A class object to find the simple name of.
772 * @return The simple class name.
773 */
774 public String simpleClassName( Class c )
775 {
776 String qualifiedName = c.getName();
777 return qualifiedName.substring(qualifiedName.lastIndexOf('.')+1);
778 }
779
780 /***
781 * Get the (1 based) number of the current record object being
782 * processed. Note: Record 0 = setup/xml parsing phase.
783 *
784 * @return Returns the datasetRecordNumber.
785 */
786 public int getDatasetRecordNumber()
787 {
788 return datasetRecordNumber;
789 }
790
791 /***
792 * Sets the (1 based) number of the current record object being
793 * processed.
794 *
795 * @param datasetRecordNumber The datasetRecordNumber to set.
796 */
797 public void setDatasetRecordNumber(int datasetRecordNumber)
798 {
799 this.datasetRecordNumber = datasetRecordNumber;
800 }
801 }