/*
 * Decompiled with CFR 0.152.
 */
package com.ejtone.mars.kernel.core.sender;

import com.ejtone.mars.kernel.core.fault.Err;
import com.ejtone.mars.kernel.core.fault.Fault;
import com.ejtone.mars.kernel.core.message.Request;
import com.ejtone.mars.kernel.core.sender.DelegateSenderService;
import com.ejtone.mars.kernel.core.sender.RequestListener;
import com.ejtone.mars.kernel.core.sender.Sender;
import com.ejtone.mars.kernel.core.sender.SenderState;
import com.ejtone.mars.kernel.core.sender.SenderStateListenerProvider;
import com.ejtone.mars.kernel.util.Timer;
import com.ejtone.mars.kernel.util.config.ConfigUtils;
import com.ejtone.mars.kernel.util.flowctrl.DummyFlowControler;
import com.ejtone.mars.kernel.util.flowctrl.FlowControler;
import com.ejtone.mars.kernel.util.monitor.ExecutorMonitor;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractSender
extends SenderStateListenerProvider
implements Sender {
    private static final Logger logger = LoggerFactory.getLogger(AbstractSender.class);
    private String name;
    private ThreadPoolExecutor threadPool;
    private FlowControler flowControler;
    private int threadPoolCoreSize = ConfigUtils.getInt("thread_pool_core_size", 2);
    private int threadPoolQueueLimit = ConfigUtils.getInt("thread_pool_queue_limit", 4096);

    public AbstractSender(String name) {
        this.name = name;
    }

    @Override
    public String getName() {
        return this.name;
    }

    public void setThreadPoolCoreSize(int threadPoolCoreSize) {
        this.threadPoolCoreSize = threadPoolCoreSize;
    }

    public void setThreadPoolQueueLimit(int threadPoolQueueLimit) {
        this.threadPoolQueueLimit = threadPoolQueueLimit;
    }

    @Override
    public void request(Request request) throws Fault {
        if (!this.isRunning()) {
            throw Err.IllegalState.makeFault();
        }
        Runnable r = this.newSenderTask(request, RequestListener.NoopListener);
        try {
            this.threadPool.execute(r);
        }
        catch (RejectedExecutionException e) {
            throw Err.SystemBusy.makeFault();
        }
    }

    @Override
    public void request(Request request, RequestListener requestListener) {
        if (!this.isRunning()) {
            requestListener.requestException(request, Err.IllegalState.makeFault());
            return;
        }
        Runnable r = this.newSenderTask(request, requestListener);
        try {
            this.threadPool.execute(r);
        }
        catch (RejectedExecutionException e) {
            requestListener.requestException(request, Err.SystemBusy.makeFault());
        }
    }

    @Override
    public void request(Request request, RequestListener requestListener, boolean blockIfRejected) {
        if (!this.isRunning()) {
            requestListener.requestException(request, Err.IllegalState.makeFault());
            return;
        }
        Runnable r = this.newSenderTask(request, requestListener);
        try {
            this.threadPool.execute(r);
        }
        catch (RejectedExecutionException e) {
            if (blockIfRejected) {
                r.run();
            }
            requestListener.requestException(request, Err.SystemBusy.makeFault());
        }
    }

    @Override
    protected void doStart() throws Exception {
        super.doStart();
        if (this.flowControler == null) {
            this.flowControler = new DummyFlowControler();
        }
        if (this.threadPool == null) {
            this.threadPool = new ThreadPoolExecutor(this.threadPoolCoreSize, this.threadPoolCoreSize, 0L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(this.threadPoolQueueLimit));
        }
        this.threadPool.prestartAllCoreThreads();
        ExecutorMonitor.getInstance().regist(this.name, this.threadPool);
        DelegateSenderService.getInstance().regist(this);
        if (this.isAvailable()) {
            this.notifyState(SenderState.available);
        }
    }

    @Override
    protected void doStop() throws Exception {
        Timer t = new Timer();
        this.notifyState(SenderState.unavailable);
        DelegateSenderService.getInstance().unregist(this);
        if (this.threadPool != null) {
            ExecutorMonitor.getInstance().unregist(this.threadPool);
            this.threadPool.shutdown();
            this.threadPool.awaitTermination(this.getStopTimeout() / 2L, TimeUnit.MILLISECONDS);
            if (!this.threadPool.isTerminated()) {
                List<Runnable> list = this.threadPool.shutdownNow();
                if (list != null && list.size() > 0) {
                    logger.warn("threadPool {} stopped, {} request maybe discard", (Object)this.name, (Object)list.size());
                } else {
                    logger.info("threadPool {} stopped safely", (Object)this.name);
                }
            } else {
                logger.info("threadPool {} stopped safely", (Object)this.name);
            }
        }
        super.doStop();
        logger.info("sender {} stopped, cost {} ms", (Object)this.name, (Object)t.cost());
    }

    protected abstract void requestAsync(Request var1, RequestListener var2);

    protected Runnable newSenderTask(Request request, RequestListener listener) {
        return new DefaultSenderTask(request, listener);
    }

    public ThreadPoolExecutor getThreadPool() {
        return this.threadPool;
    }

    public void setThreadPool(ThreadPoolExecutor threadPool) {
        if (this.isRunning()) {
            throw new IllegalStateException("can not set threadpool after sender started");
        }
        this.threadPool = threadPool;
    }

    public FlowControler getFlowControler() {
        return this.flowControler;
    }

    public void setFlowControler(FlowControler flowControler) {
        this.flowControler = flowControler;
    }

    private class DefaultSenderTask
    implements Runnable {
        private Request request;
        private RequestListener listener;

        public DefaultSenderTask(Request request, RequestListener listener) {
            this.request = request;
            this.listener = listener;
        }

        @Override
        public void run() {
            try {
                if (AbstractSender.this.isRunning()) {
                    AbstractSender.this.flowControler.control(this.request.getFlowNumber());
                    AbstractSender.this.requestAsync(this.request, this.listener);
                } else {
                    this.listener.requestException(this.request, Err.IllegalState.makeFault());
                }
            }
            catch (InterruptedException e) {
                this.listener.requestException(this.request, Err.IllegalState.makeFault());
            }
            catch (Throwable t) {
                logger.error("what's wrong?", t);
                this.listener.requestException(this.request, Err.InternalError.makeFault(t));
            }
        }
    }
}

