Advanced java code dealing with real world problems.

Thursday, September 10, 2009

Build a reliable email reader - Part 1

When I was building a back-end email reader that reads emails from a mailbox and save them for future processing. I encountered a problem that the back-end task would stop unexpectedly due to various reasons, such as when a malformed email was received, or the mailbox was temporary disconnected, or the pop3 or imap server was temporarily out of service for maintainance, etc.

A reliable email reader program was needed so it can be started to run continually until some catastrophic events occurred. The email reader presented here will read email messages into our portable message beans, and display them to the console. The pop3 server located on the "localhost" is used, and the mailbox name is "support" with password "support". Please change them to point to your pop3 account accordingly.

We will need two more supporting classes which I will present them in Part 2.
/*
 * blog/javaclue/javamail/MailReader.java
 * 
 * Copyright (C) 2009 JackW
 * 
 * This program is free software: you can redistribute it and/or modify it under the terms of the
 * GNU Lesser General Public License as published by the Free Software Foundation, either version 3
 * of the License, or (at your option) any later version.
 * 
 * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
 * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 * 
 * You should have received a copy of the GNU Lesser General Public License along with this library.
 * If not, see <http://www.gnu.org/licenses/>.
 */
package blog.javaclue.javamail;

import java.io.IOException;
import java.util.Date;
import java.util.Properties;

import javax.mail.Folder;
import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.NoSuchProviderException;
import javax.mail.Session;
import javax.mail.Store;
import javax.mail.event.ConnectionEvent;
import javax.mail.event.ConnectionListener;
import javax.mail.event.MessageCountAdapter;
import javax.mail.event.MessageCountEvent;
import javax.mail.event.StoreEvent;
import javax.mail.event.StoreListener;

import org.apache.log4j.Logger;

/**
 * This class provides methods to read e-mails from a mailbox.
 */
public class MailReader implements ConnectionListener, StoreListener {
 private static final long serialVersionUID = -9061869821061961065L;
 private static final Logger logger = Logger.getLogger(MailReader.class);
 protected static final boolean isDebugEnabled = logger.isDebugEnabled();

 protected final String LF = System.getProperty("line.separator", "\n");
 private final boolean debugSession = false;

 private final Session session;
 private final Mailbox mailbox;
 private final MailProcessor processor;
 
 private Store store = null;
 private Folder folder = null;
 
 private static final int MAX_MSGS_PER_READ = 100;
 private static final int MAX_WAIT = 120 * 1000; // up to two minutes

 private final int msgsPerRead;
 private final int pollingFreq;
 private int messagesProcessed = 0;

 private static final int[] RetryFreqs = 
  { 5, 10, 10, 20, 20, 20, 30, 30, 30, 30, 60, 60, 60, 60, 60 }; // in seconds
 private static final int RETRY_FREQ = 120; // in seconds

 public static void main(String[] args) {
  Mailbox vo = new Mailbox("localhost", "support", "support");
  MailReader reader = new MailReader(vo);
  try {
   reader.readMail();
  }
  catch (Exception e) {
   e.printStackTrace();
  }
 }

 /**
  * create a MailReader instance
  * 
  * @param mbox -
  *            mailbox properties
  */
 public MailReader(Mailbox mbox) {
  this.mailbox = mbox;

  // number of e-mails (="msgsPerPass") to read per cycle
  int msgs_per_read = mbox.getMessagesPerRead();
  msgs_per_read = msgs_per_read <= 0 ? 5 : msgs_per_read; // default is 5
  msgsPerRead = msgs_per_read;

  // number of seconds (="pollingFreq") to wait between reads
  int _freq = mbox.getMinimumWait() * 1000 + msgsPerRead * 100;
  pollingFreq = _freq > MAX_WAIT ? MAX_WAIT : _freq; // upper limit is MAX_WAIT
  if (isDebugEnabled)
   logger.debug("Wait between reads in milliseconds: " + pollingFreq);
  
  // enable RFC2231 support in parameter lists, since javamail 1.4
  // Since very few existing programs support RFC2231, disable it for now
  /*
  System.setProperty("mail.mime.encodeparameters", "true");
  System.setProperty("mail.mime.decodeparameters", "true");
  System.setProperty("mail.mime.encodefilename", "true");
  System.setProperty("mail.mime.decodefilename", "true");
  */
  
  // to make the reader more tolerable
  System.setProperty("mail.mime.multipart.ignoremissingendboundary", "true");
  System.setProperty("mail.mime.multipart.ignoremissingboundaryparameter", "true");
  
  Properties m_props = (Properties) System.getProperties().clone();
  m_props.setProperty("mail.debug", "true");
  m_props.setProperty("mail.debug.quote", "true");

  /*
   * POP3 - properties of com.sun.mail.pop3 
   * mailbox can be accessed via URL: pop3://user:password@host:port/INBOX
   */
  // set timeouts in milliseconds. default for both is infinite
  // Socket connection timeout
  m_props.setProperty("mail.pop3.connectiontimeout", "900000");
  // Socket I/O timeout
  m_props.setProperty("mail.pop3.timeout", "750000");
  // m_props.setProperty("mail.pop3.rsetbeforequit","true");
  /* issue RSET before QUIT, default: false */

  /* IMAP - properties of com.sun.mail.imap */
  // set timeouts in milliseconds. default for both is infinite
  // Socket connection timeout
  m_props.setProperty("mail.imap.connectiontimeout", "900000");
  // Socket I/O timeout
  m_props.setProperty("mail.imap.timeout", "750000");
  
  // Certain IMAP servers do not implement the IMAP Partial FETCH
  // functionality properly
  // set Partial fetch to false to workaround exchange server 5.5 bug
  m_props.setProperty("mail.imap.partialfetch","false");
  
  // If your version of Exchange doesn't implement POP3 properly, you need
  // to tell JavaMail to forget about TOP headers by setting the 
  // mail.pop3.forgettopheaders property to true.
  if (mbox.isExchange()) {
   m_props.setProperty("mail.pop3.forgettopheaders","true");
  }
  
  // Get a Session object
  if (mbox.isUseSsl()) {
   m_props.setProperty("mail.pop3.socketFactory.class", "javax.net.ssl.SSLSocketFactory");
   m_props.setProperty("mail.pop3.socketFactory.fallback", "false");
   m_props.setProperty("mail.pop3.port", mbox.getPort()+"");
   m_props.setProperty("mail.pop3.socketFactory.port", mbox.getPort()+"");
   session = Session.getInstance(m_props);
  }
  else {
   session = Session.getInstance(m_props, null);
  }
  
  processor = new MailProcessor(mbox);
 }
 
 /**
  * invoke application plug-in to process e-mails.
  * 
  * @throws MessagingException
  * @throws IOException
  */
 private void readMail() throws MessagingException, IOException {
  session.setDebug(true); // DON'T CHANGE THIS
  String protocol = mailbox.getProtocol();
  if (!"imap".equalsIgnoreCase(protocol) && !"pop3".equalsIgnoreCase(protocol)) {
   throw new IllegalArgumentException("Invalid protocol " + protocol);
  }
  if (store == null) {
   try {
    // Get a Store object
    store = session.getStore(protocol);
    store.addConnectionListener(this);
    store.addStoreListener(this);
   }
   catch (NoSuchProviderException pe) {
    logger.fatal("NoSuchProviderException caught during session.getStore()", pe);
    throw pe;
   }
  }
  try {
   connect(store, 0, mailbox.getMaxRetries()); // could fail due to authentication error
   folder = getFolder(store, 0, 1); // retry once on folder
   // reset debug mode
   session.setDebug(debugSession);
   if ("imap".equalsIgnoreCase(protocol)) {
    // only IMAP support MessageCountListener
    final String _folder = mailbox.getFolderName();
    // Add messageCountListener to listen to new messages from IMAP server
    addMsgCountListener(folder, _folder);
   }
   if ("pop3".equalsIgnoreCase(protocol)) {
    readFromPop3();
   }
   else if ("imap".equalsIgnoreCase(protocol)) {
    readFromImap();
   }
  }
  catch (InterruptedException e) {
   logger.warn("InterruptedException caught, exiting...", e);
  }
  finally {
   try {
    if (folder != null && folder.isOpen()) {
     folder.close(false);
    }
    store.close();
   }
   catch (Exception e) {
    logger.error("Exception caught", e);
   }
  }
  if (isDebugEnabled)
   logger.debug("MailReader ended");
 }

 private void readFromPop3() throws InterruptedException, MessagingException, IOException {
  final String _user = mailbox.getUserId();
  final String _host = mailbox.getHost();
  final String _folder = mailbox.getFolderName();
  boolean keepRunning = true;
  int retries = 0;
  do {
   try {
    if (folder.isOpen()) {
     folder.close(false);
    }
   }
   catch (MessagingException em) {
    logger.error("MessagingException caught during folder.close()", em);
   }
   try {
    Thread.sleep(pollingFreq); // exit if interrupted
    // reopen the folder in order to pick up the new messages
    folder.open(Folder.READ_WRITE);
   }
   catch (MessagingException e) {
    logger.error("Failed to open folder " + _user + "@" + _host + ":" + _folder);
    logger.error("MessagingException caught", e);
    if (retries++ < mailbox.getMaxRetries() || mailbox.getMaxRetries() < 0) {
     int sleepFor;
     // wait for a while and try to reopen the folder
     if (retries < RetryFreqs.length) {
      sleepFor = RetryFreqs[retries];
     }
     else {
      sleepFor = RETRY_FREQ;
     }
     logger.error("Exception caught during folder.open(), retry(=" + retries
       + ") in " + sleepFor + " seconds");
     Thread.sleep(sleepFor * 1000);
      // terminate if interrupted
     continue;
    }
    else {
     logger.fatal("All retries failed for " + _user + "@" + _host + ":" + _folder);
     throw e;
    }
   }
   if (retries > 0) {
    logger.warn("Opened " + _user + "@" + _host + ":" + _folder + " after " + retries + " retries");
    retries = 0; // reset retry counter
   }
   Date start_tms = new Date();
   int msgCount;
   if ((msgCount = folder.getMessageCount()) > 0) {
    logger.info(mailbox.getUserId() + "'s " + _folder + " has " + msgCount + " messages.");
    // "msgsPerRead" is used so the flagged messages will be purged more often
    int msgsToRead = Math.min(msgCount, msgsPerRead);
    // if we can't keep up, process more messages in each cycle
    if (msgCount > msgsToRead * 50) {
     msgsToRead *= 50;
    }
    else if (msgCount > msgsToRead * 10) {
     msgsToRead *= 10;
    }
    else if (msgCount > msgsToRead * 5) {
     msgsToRead *= 5;
    }
    msgsToRead = msgsToRead > MAX_MSGS_PER_READ ? MAX_MSGS_PER_READ : msgsToRead;
    logger.info("number of messages to be read in this cycle: " + msgsToRead);
    Message[] msgs = null;
    try {
     msgs = folder.getMessages(1, msgsToRead);
    }
    catch (IndexOutOfBoundsException ie) {
     logger.error("IndexOutOfBoundsException caught, retry with getMessages()", ie);
     msgs = folder.getMessages();
     logger.info("Retry with folder.getMessages() is successful.");
    }
    execute(msgs); // process the messages read
    folder.close(true); // "true" to delete the flagged messages
    logger.info(msgs.length + " messages have been purged from pop3 mailbox.");
    messagesProcessed += msgs.length;
    long proc_time = new Date().getTime() - start_tms.getTime();
    if (isDebugEnabled)
     logger.debug(msgs.length+ " messages read, time taken: " + proc_time);
   }
  } while (keepRunning); // end of do-while
 }
 
 private void readFromImap() throws MessagingException, InterruptedException, IOException {
  boolean keepRunning = true;
  folder.open(Folder.READ_WRITE);
  /*
   * fix for some IMAP servers: some IMAP servers wouldn't pick up the
   * existing messages, the MessageCountListener may not be implemented
   * correctly for those servers.
   */
  if (folder.getMessageCount() > 0) {
   logger.info(mailbox.getUserId() + "'s " + mailbox.getFolderName() + " has "
     + folder.getMessageCount() + " messages.");
   Date start_tms = new Date();
   Message msgs[] = folder.getMessages();
   execute(msgs);
   folder.expunge(); // remove messages marked as DELETED
   logger.info(msgs.length + " messages have been expunged from imap mailbox.");
   long proc_time = new Date().getTime() - start_tms.getTime();
   if (isDebugEnabled)
    logger.debug(msgs.length+ " messages read, time taken: " + proc_time);
  }
  /* end of the fix */
  while (keepRunning) {
   Thread.sleep(pollingFreq); // sleep for "pollingFreq"
   // This is to force the IMAP server to send us
   // EXISTS notifications.
   folder.getMessageCount();
  }
 }
 
 /**
  * Add messageCountListener to listen to new messages for IMAP.
  * 
  * @param folder -
  *            a Folder object
  * @param _folder -
  *            folder name
  */
 private void addMsgCountListener(final Folder folder, final String _folder) {
  folder.addMessageCountListener(new MessageCountAdapter() {
   private final Logger logger = Logger.getLogger(MessageCountAdapter.class);
   public void messagesAdded(MessageCountEvent ev) {
    Message[] msgs = ev.getMessages();
    logger.info("Got " + msgs.length + " new messages from " + _folder);
    Date start_tms = new Date();
    try {
     execute(msgs);
     folder.expunge(); // remove messages marked as DELETED
     logger.info(msgs.length + " messages have been expunged from imap mailbox.");
     messagesProcessed += msgs.length;
    }
    catch (MessagingException ex) {
     logger.fatal("MessagingException caught", ex);
     throw new RuntimeException(ex.getMessage());
    }
    catch (IOException ex) {
     logger.fatal("IOException caught", ex);
     throw new RuntimeException(ex.getMessage());
    }
    finally {
     long proc_time = new Date().getTime() - start_tms.getTime();
     if (isDebugEnabled)
      logger.debug(msgs.length+ " messages processed, time taken: " + proc_time);
    }
   }
  }); // end of IMAP folder.addMessageCountListener
 }
 
 /*
  * process e-mails.
  * 
  * @param msgs -
  *            messages to be processed.
  * @throws MessagingException
  * @throws IOException
  */
 private void execute(Message[] msgs) throws IOException, MessagingException {
  if (msgs == null || msgs.length == 0) return;
  processor.process(msgs);
 }
 
 /**
  * implement ConnectionListener interface
  * 
  * @param e -
  *            Connection event
  */
 public void opened(ConnectionEvent e) {
  if (isDebugEnabled)
   logger.debug(">>> ConnectionListener: connection opened()");
 }

 /**
  * implement ConnectionListener interface
  * 
  * @param e -
  *            Connection event
  */
 public void disconnected(ConnectionEvent e) {
  logger.info(">>> ConnectionListener: connection disconnected()");
 }

 /**
  * implement ConnectionListener interface
  * 
  * @param e -
  *            Connection event
  */
 public void closed(ConnectionEvent e) {
  if (isDebugEnabled)
   logger.debug(">>> ConnectionListener: connection closed()");
 }

 public void notification(StoreEvent e) {
  if (isDebugEnabled)
   logger.debug(">>> StoreListener: notification event: " + e.getMessage());
 }
 
 /* end of the implementation */

 /**
  * connect to Store with retry logic.
  * 
  * @param store
  *            Store object
  * @param retries
  *            number of retries performed
  * @param maxRetries
  *            number of retries to be performed before giving up
  * @throws MessagingException 
  *             when retries reached the maxRetries
  * @throws InterruptedException 
  */
 void connect(Store store, int retries, int maxRetries) throws MessagingException,
   InterruptedException {
  int portnbr = mailbox.getPort();
  // -1 to use the default port
  if (isDebugEnabled)
   logger.debug("Port used: " + portnbr);
  if (retries > 0) { // retrying, close store first
   try {
    store.close();
   }
   catch (MessagingException e) {
    logger.error("MessagingException caught during retry on store.close()", e);
   }
  }
  try {
   // connect
   store.connect(mailbox.getHost(), portnbr, mailbox.getUserId(), mailbox.getUserPswd());
  }
  catch (MessagingException me) {
   if (retries < maxRetries || maxRetries < 0) {
    int sleepFor;
    if (retries < RetryFreqs.length) {
     sleepFor = RetryFreqs[retries];
    }
    else {
     sleepFor = RETRY_FREQ;
    }
    logger.error("MessagingException caught during store.connect, retry(=" + retries
      + ") in " + sleepFor + " seconds");
    try {
     Thread.sleep(sleepFor * 1000);
    }
    catch (InterruptedException e) {
     logger.warn("InterruptedException caught", e);
     throw e;
    }
    connect(store, ++retries, maxRetries);
   }
   else {
    logger.fatal("Exception caught during store.connect, all retries failed...");
    throw me;
   }
  }
 }

 /**
  * retrieve Folder with retry logic.
  * 
  * @param store
  *            Store object
  * @param retries
  *            number of retries performed
  * @param maxRetries
  *            number of retries to be performed before giving up
  * @return Folder instance
  * @throws MessagingException 
  * @throws InterruptedException 
  */
 Folder getFolder(Store store, int retries, int maxRetries) throws MessagingException,
   InterruptedException {
  try {
   // Open a Folder
   //folder = store.getDefaultFolder();
   folder = store.getFolder(mailbox.getFolderName());

   if (folder == null || !folder.exists()) {
    throw new MessagingException("Invalid folder " + mailbox.getFolderName());
   }
  }
  catch (MessagingException me) {
   if (retries < maxRetries || maxRetries < 0) {
    int sleepFor;
    if (retries < RetryFreqs.length) {
     sleepFor = RetryFreqs[retries];
    }
    else {
     sleepFor = RETRY_FREQ;
    }
    logger.error("MessagingException caught during store.getFolder, retry(=" + retries
      + ") in " + sleepFor + " seconds");
    try {
     Thread.sleep(sleepFor * 1000);
    }
    catch (InterruptedException e) {
     logger.warn("InterruptedException caught", e);
     throw e;
    }
    return getFolder(store, ++retries, maxRetries);
   }
   else {
    logger.fatal("Exception caught during store.getFolder, all retries failed");
    throw me;
   }
  }
  return folder;
 }
}

3 comments:

  1. thank you for this post it helped me a lot

    ReplyDelete
  2. Nice Example.. thanks was very helpful for my project... Even this http://www.compiletimeerror.com/2013/06/reading-email-using-javamail-api-example.html was also helpful... may help.. Have a look...

    ReplyDelete
  3. Thank you very much for a wonderful and informative Article. I came across a Java email component by the name of Aspose.Email for Java . Could you please share your thoughts about it?

    ReplyDelete

Followers

About Me

An IT professional with more than 20 years of experience in enterprise computing. An Audio enthusiast designed and built DIY audio gears and speakers.