国内最全IT社区平台 联系我们 | 收藏本站
您当前位置:首页 > php开源 > 综合技术 > Flume之监控


来源:程序员人生   发布时间:2016-06-03 07:59:26 阅读次数:15252次







bin/flume-ng agent --conf conf --conf-file conf/flume_conf.properties --name collect -Dflume.monitoring.type=http -Dflume.monitoring.port=1234



bin/flume-ng agent --conf conf --conf-file conf/producer.properties --name collect -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=ip:port
















private void startAllComponents( MaterializedConfiguration materializedConfiguration) { logger.info("Starting new configuration:{}", materializedConfiguration); this.materializedConfiguration = materializedConfiguration; for (Entry<String, Channel> entry : materializedConfiguration .getChannels().entrySet()) { try { logger.info("Starting Channel " + entry.getKey()); supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e) { logger.error("Error while starting {}", entry.getValue(), e); } } /* * Wait for all channels to start. */ for (Channel ch : materializedConfiguration.getChannels().values()) { while (ch.getLifecycleState() != LifecycleState.START && !supervisor.isComponentInErrorState(ch)) { try { logger.info("Waiting for channel: " + ch.getName() + " to start. Sleeping for 500 ms"); Thread.sleep(500); } catch (InterruptedException e) { logger.error( "Interrupted while waiting for channel to start.", e); Throwables.propagate(e); } } } for (Entry<String, SinkRunner> entry : materializedConfiguration .getSinkRunners().entrySet()) { try { logger.info("Starting Sink " + entry.getKey()); supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e) { logger.error("Error while starting {}", entry.getValue(), e); } } for (Entry<String, SourceRunner> entry : materializedConfiguration .getSourceRunners().entrySet()) { try { logger.info("Starting Source " + entry.getKey()); supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e) { logger.error("Error while starting {}", entry.getValue(), e); } } this.loadMonitoring(); }


private void loadMonitoring() { Properties systemProps = System.getProperties(); Set<String> keys = systemProps.stringPropertyNames(); try { if (keys.contains(CONF_MONITOR_CLASS)) { String monitorType = systemProps .getProperty(CONF_MONITOR_CLASS); Class<? extends MonitorService> klass; try { // Is it a known type? klass = MonitoringType.valueOf(monitorType.toUpperCase()) .getMonitorClass(); } catch (Exception e) { // Not a known type, use FQCN klass = (Class<? extends MonitorService>) Class .forName(monitorType); } this.monitorServer = klass.newInstance(); Context context = new Context(); for (String key : keys) { if (key.startsWith(CONF_MONITOR_PREFIX)) { context.put( key.substring(CONF_MONITOR_PREFIX.length()), systemProps.getProperty(key)); } } monitorServer.configure(context); monitorServer.start(); } } catch (Exception e) { logger.warn("Error starting monitoring. " + "Monitoring might not be available.", e); } }




public class HTTPMetricsServer implements MonitorService { private Server jettyServer; private int port; private static Logger LOG = LoggerFactory.getLogger(HTTPMetricsServer.class); public static int DEFAULT_PORT = 41414; public static String CONFIG_PORT = "port"; @Override public void start() { jettyServer = new Server(); //We can use Contexts etc if we have many urls to handle. For one url, //specifying a handler directly is the most efficient. SelectChannelConnector connector = new SelectChannelConnector(); connector.setReuseAddress(true); connector.setPort(port); jettyServer.setConnectors(new Connector[] {connector}); jettyServer.setHandler(new HTTPMetricsHandler()); try { jettyServer.start(); while (!jettyServer.isStarted()) { Thread.sleep(500); } } catch (Exception ex) { LOG.error("Error starting Jetty. JSON Metrics may not be available.", ex); } } @Override public void stop() { try { jettyServer.stop(); jettyServer.join(); } catch (Exception ex) { LOG.error("Error stopping Jetty. JSON Metrics may not be available.", ex); } } @Override public void configure(Context context) { port = context.getInteger(CONFIG_PORT, DEFAULT_PORT); } private class HTTPMetricsHandler extends AbstractHandler { Type mapType = new TypeToken<Map<String, Map<String, String>>>() { }.getType(); Gson gson = new Gson(); @Override public void handle(String target, HttpServletRequest request, HttpServletResponse response, int dispatch) throws IOException, ServletException { // /metrics is the only place to pull metrics. //If we want to use any other url for something else, we should make sure //that for metrics only /metrics is used to prevent backward //compatibility issues. if(request.getMethod().equalsIgnoreCase("TRACE") || request.getMethod() .equalsIgnoreCase("OPTIONS")) { response.sendError(HttpServletResponse.SC_FORBIDDEN); response.flushBuffer(); ((Request) request).setHandled(true); return; } if (target.equals("/")) { response.setContentType("text/html;charset=utf⑻"); response.setStatus(HttpServletResponse.SC_OK); response.getWriter().write("For Flume metrics please click" + " <a href = \"./metrics\"> here</a>."); response.flushBuffer(); ((Request) request).setHandled(true); return; } else if (target.equalsIgnoreCase("/metrics")) { response.setContentType("application/json;charset=utf⑻"); response.setStatus(HttpServletResponse.SC_OK); Map<String, Map<String, String>> metricsMap = JMXPollUtil.getAllMBeans(); String json = gson.toJson(metricsMap, mapType); response.getWriter().write(json); response.flushBuffer(); ((Request) request).setHandled(true); return; } response.sendError(HttpServletResponse.SC_NOT_FOUND); response.flushBuffer(); //Not handling the request returns a Not found error page. } } }

通过源码我们可以看到Map<String, Map<String, String>> metricsMap = JMXPollUtil.getAllMBeans();具体的数据都是从这条语句得来的,再仔细看可以得知,这些监控数据是同JMX的方式得到的。至于里面具体实现的细节,相对照较复杂,同时也不属于我们讨论的范畴,所以这里不讨论这块。


public abstract class MonitoredCounterGroup { private static final Logger logger = LoggerFactory.getLogger(MonitoredCounterGroup.class); // Key for component's start time in MonitoredCounterGroup.counterMap private static final String COUNTER_GROUP_START_TIME = "start.time"; // key for component's stop time in MonitoredCounterGroup.counterMap private static final String COUNTER_GROUP_STOP_TIME = "stop.time"; private final Type type; private final String name; private final Map<String, AtomicLong> counterMap; private AtomicLong startTime; private AtomicLong stopTime; private volatile boolean registered = false; protected MonitoredCounterGroup(Type type, String name, String... attrs) { this.type = type; this.name = name; Map<String, AtomicLong> counterInitMap = new HashMap<String, AtomicLong>(); // Initialize the counters for (String attribute : attrs) { counterInitMap.put(attribute, new AtomicLong(0L)); } counterMap = Collections.unmodifiableMap(counterInitMap); startTime = new AtomicLong(0L); stopTime = new AtomicLong(0L); } /** * Starts the component * * Initializes the values for the stop time as well as all the keys in the * internal map to zero and sets the start time to the current time in * milliseconds since midnight January 1, 1970 UTC */ public void start() { register(); stopTime.set(0L); for (String counter : counterMap.keySet()) { counterMap.get(counter).set(0L); } startTime.set(System.currentTimeMillis()); logger.info("Component type: " + type + ", name: " + name + " started"); } /** * Registers the counter. * This method is exposed only for testing, and there should be no need for * any implementations to call this method directly. */ @VisibleForTesting void register() { if (!registered) { try { ObjectName objName = new ObjectName("org.apache.flume." + type.name().toLowerCase() + ":type=" + this.name); if (ManagementFactory.getPlatformMBeanServer().isRegistered(objName)) { logger.debug("Monitored counter group for type: " + type + ", name: " + name + ": Another MBean is already registered with this name. " + "Unregistering that pre-existing MBean now..."); ManagementFactory.getPlatformMBeanServer().unregisterMBean(objName); logger.debug("Monitored counter group for type: " + type + ", name: " + name + ": Successfully unregistered pre-existing MBean."); } ManagementFactory.getPlatformMBeanServer().registerMBean(this, objName); logger.info("Monitored counter group for type: " + type + ", name: " + name + ": Successfully registered new MBean."); registered = true; } catch (Exception ex) { logger.error("Failed to register monitored counter group for type: " + type + ", name: " + name, ex); } } } /** * Shuts Down the Component * * Used to indicate that the component is shutting down. * * Sets the stop time and then prints out the metrics from * the internal map of keys to values for the following components: * * - ChannelCounter * - ChannelProcessorCounter * - SinkCounter * - SinkProcessorCounter * - SourceCounter */ public void stop() { // Sets the stopTime for the component as the current time in milliseconds stopTime.set(System.currentTimeMillis()); // Prints out a message indicating that this component has been stopped logger.info("Component type: " + type + ", name: " + name + " stopped"); // Retrieve the type for this counter group final String typePrefix = type.name().toLowerCase(); // Print out the startTime for this component logger.info("Shutdown Metric for type: " + type + ", " + "name: " + name + ". " + typePrefix + "." + COUNTER_GROUP_START_TIME + " == " + startTime); // Print out the stopTime for this component logger.info("Shutdown Metric for type: " + type + ", " + "name: " + name + ". " + typePrefix + "." + COUNTER_GROUP_STOP_TIME + " == " + stopTime); // Retrieve and sort counter group map keys final List<String> mapKeys = new ArrayList<String>(counterMap.keySet()); Collections.sort(mapKeys); // Cycle through and print out all the key value pairs in counterMap for (final String counterMapKey : mapKeys) { // Retrieves the value from the original counterMap. final long counterMapValue = get(counterMapKey); logger.info("Shutdown Metric for type: " + type + ", " + "name: " + name + ". " + counterMapKey + " == " + counterMapValue); } } /** * Returns when this component was first started * * @return */ public long getStartTime() { return startTime.get(); } /** * Returns when this component was stopped * * @return */ public long getStopTime() { return stopTime.get(); } @Override public final String toString() { StringBuilder sb = new StringBuilder(type.name()).append(":"); sb.append(name).append("{"); boolean first = true; Iterator<String> counterIterator = counterMap.keySet().iterator(); while (counterIterator.hasNext()) { if (first) { first = false; } else { sb.append(", "); } String counterName = counterIterator.next(); sb.append(counterName).append("=").append(get(counterName)); } sb.append("}"); return sb.toString(); } /** * Retrieves the current value for this key * * @param counter The key for this metric * @return The current value for this key */ protected long get(String counter) { return counterMap.get(counter).get(); } /** * Sets the value for this key to the given value * * @param counter The key for this metric * @param value The new value for this key */ protected void set(String counter, long value) { counterMap.get(counter).set(value); } /** * Atomically adds the delta to the current value for this key * * @param counter The key for this metric * @param delta * @return The updated value for this key */ protected long addAndGet(String counter, long delta) { return counterMap.get(counter).addAndGet(delta); } /** * Atomically increments the current value for this key by one * * @param counter The key for this metric * @return The updated value for this key */ protected long increment(String counter) { return counterMap.get(counter).incrementAndGet(); } /** * Component Enum Constants * * Used by each component's constructor to distinguish which type the * component is. */ public static enum Type { SOURCE, CHANNEL_PROCESSOR, CHANNEL, SINK_PROCESSOR, SINK, INTERCEPTOR, SERIALIZER, OTHER }; public String getType(){ return type.name(); } }

初始化构造方法protected MonitoredCounterGroup(Type type, String name, String... attrs):初始化组件类型,和1些监控元素;





public interface SourceCounterMBean { long getEventReceivedCount(); long getEventAcceptedCount(); long getAppendReceivedCount(); long getAppendAcceptedCount(); long getAppendBatchReceivedCount(); long getAppendBatchAcceptedCount(); long getStartTime(); long getStopTime(); String getType(); long getOpenConnectionCount(); String getIp(); String getPort(); }







