package org.jboss.net.axis.transport.mailto.client;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import javax.mail.Flags;
import javax.mail.Message;
import javax.mail.MessagingException;
import org.apache.axis.EngineConfiguration;
import org.apache.axis.MessageContext;
import org.apache.axis.client.Call;
import org.apache.axis.client.Service;
import org.apache.axis.client.async.AsyncCall;
import org.apache.axis.client.async.IAsyncCallback;
import org.apache.axis.client.async.IAsyncResult;
import org.apache.axis.client.async.Status;
import org.apache.axis.message.addressing.AddressingHeaders;
import org.apache.axis.message.addressing.Constants;
import org.apache.axis.message.addressing.MessageID;
import org.jboss.net.axis.server.JMXEngineConfigurationFactory;
import org.jboss.net.axis.transport.mailto.client.AsyncMailClientServiceMBean;
import org.jboss.net.axis.transport.mailto.AbstractMailTransportService;
import org.jboss.net.axis.transport.mailto.MailConstants;
public class AsyncMailClientService extends AbstractMailTransportService implements AsyncMailClientServiceMBean,
MailConstants
{
private HashMap results = new HashMap();
public Service getService()
{
EngineConfiguration config = JMXEngineConfigurationFactory.newJMXFactory(getEngineName()).getClientEngineConfig();
Service service = new Service(config);
return service;
}
public void sendAsynchronously(Call call, IAsyncCallback callback, Object[] args)
{
AsyncCall aCall = new AsyncCall(call, callback);
IAsyncResult result = aCall.invoke(args);
results.put(call.getMessageContext(), result);
}
protected void processMessages(Message[] msgs)
{
if (log.isDebugEnabled())
log.debug("Entering: processMessages");
for (Iterator iter = results.entrySet().iterator(); iter.hasNext();)
{
Map.Entry entry = (Map.Entry) iter.next();
IAsyncResult result = (IAsyncResult) entry.getValue();
StringBuffer buf = new StringBuffer("\nRemoving the following messages:");
if (result.getStatus() != Status.NONE)
{
iter.remove();
if (log.isDebugEnabled())
{
MessageContext mc = (MessageContext) entry.getKey();
String msgID = getMessageID(mc);
buf.append("\n\t" + msgID + "\t" + result.getStatus().toString());
}
}
if (log.isDebugEnabled())
{
buf.append("\n\t");
log.debug(buf.toString());
}
}
HashMap contexts = new HashMap(results.size());
for (Iterator iter = results.keySet().iterator(); iter.hasNext();)
{
MessageContext mc = (MessageContext) iter.next();
String msgID = getMessageID(mc);
StringBuffer buf = new StringBuffer("\nThe following messages are waiting for responses:");
if (msgID != null)
{
if (log.isDebugEnabled())
buf.append("\n\t" + msgID);
contexts.put(msgID, mc);
}
if (log.isDebugEnabled())
{
buf.append("\n");
log.debug(buf.toString());
}
}
for (int i = 0; i < msgs.length; i++)
{
Message message = msgs[i];
String inReplyTo = null;
try
{
inReplyTo = message.getHeader(HEADER_IN_REPLY_TO)[0];
if (log.isDebugEnabled())
log.debug("found a message inReplyTo: " + inReplyTo);
}
catch (NullPointerException e)
{
continue;
}
catch (MessagingException e)
{
log.warn(e);
continue;
}
if (contexts.containsKey(inReplyTo))
{
if (log.isDebugEnabled())
log.debug("message " + inReplyTo + " correlates to a waiting request.");
MessageContext ctx = (MessageContext) contexts.get(inReplyTo);
IAsyncResult result = (IAsyncResult) results.get(ctx);
try
{
org.apache.axis.Message soapMsg = new org.apache.axis.Message(message.getInputStream(), false, message
.getContentType(), null);
ctx.setResponseMessage(soapMsg);
ctx.setPastPivot(true);
}
catch (IOException e)
{
log.fatal(e);
}
catch (MessagingException e)
{
log.fatal(e);
}
if (getDeleteMail())
{
try
{
message.setFlag(Flags.Flag.DELETED, true);
}
catch (MessagingException e)
{
log.warn("Unable to flag the message for deletion.", e);
}
}
result.abort();
}
}
if (log.isDebugEnabled())
log.debug("Exiting: processMessages");
}
private String getMessageID(MessageContext ctx)
{
String id = "no message-id";
if (ctx.containsProperty(Constants.ENV_ADDRESSING_REQUEST_HEADERS))
{
AddressingHeaders headers = (AddressingHeaders) ctx.getProperty(Constants.ENV_ADDRESSING_REQUEST_HEADERS);
MessageID msgid = headers.getMessageID();
if (msgid != null)
{
id = msgid.toString();
}
}
return id;
}
}