聊聊storm tuple的序列化

本文主要研究一下storm tuple的序列化

ExecutorTransfer.tryTransfer

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java

// Every executor has an instance of this class
public class ExecutorTransfer {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutorTransfer.class);

    private final WorkerState workerData;
    private final KryoTupleSerializer serializer;
    private final boolean isDebug;
    private int indexingBase = 0;
    private ArrayList<JCQueue> localReceiveQueues; // [taskId-indexingBase] => queue : List of all recvQs local to this worker
    private AtomicReferenceArray<JCQueue> queuesToFlush;
        // [taskId-indexingBase] => queue, some entries can be null. : outbound Qs for this executor instance


    public ExecutorTransfer(WorkerState workerData, Map<String, Object> topoConf) {
        this.workerData = workerData;
        this.serializer = new KryoTupleSerializer(topoConf, workerData.getWorkerTopologyContext());
        this.isDebug = ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_DEBUG), false);
    }

    //......

    // adds addressedTuple to destination Q if it is not full. else adds to pendingEmits (if its not null)
    public boolean tryTransfer(AddressedTuple addressedTuple, Queue<AddressedTuple> pendingEmits) {
        if (isDebug) {
            LOG.info("TRANSFERRING tuple {}", addressedTuple);
        }

        JCQueue localQueue = getLocalQueue(addressedTuple);
        if (localQueue != null) {
            return tryTransferLocal(addressedTuple, localQueue, pendingEmits);
        }
        return workerData.tryTransferRemote(addressedTuple, pendingEmits, serializer);
    }

    //......
}
  • ExecutorTransfer在构造器里头创建了KryoTupleSerializer
  • 这里先判断目标地址是否是在localQueue中,如果是则进行local transfer,否则进行remote transfer
  • remote transfer的时候调用了workerData.tryTransferRemote,并传递了serializer

WorkerState.tryTransferRemote

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java

    /* Not a Blocking call. If cannot emit, will add 'tuple' to pendingEmits and return 'false'. 'pendingEmits' can be null */
    public boolean tryTransferRemote(AddressedTuple tuple, Queue<AddressedTuple> pendingEmits, ITupleSerializer serializer) {
        return workerTransfer.tryTransferRemote(tuple, pendingEmits, serializer);
    }
  • WorkerState.tryTransferRemote实际上使用的是workerTransfer.tryTransferRemote

workerTransfer.tryTransferRemote

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java

    /* Not a Blocking call. If cannot emit, will add 'tuple' to 'pendingEmits' and return 'false'. 'pendingEmits' can be null */
    public boolean tryTransferRemote(AddressedTuple addressedTuple, Queue<AddressedTuple> pendingEmits, ITupleSerializer serializer) {
        if (pendingEmits != null && !pendingEmits.isEmpty()) {
            pendingEmits.add(addressedTuple);
            return false;
        }

        if (!remoteBackPressureStatus[addressedTuple.dest].get()) {
            TaskMessage tm = new TaskMessage(addressedTuple.getDest(), serializer.serialize(addressedTuple.getTuple()));
            if (transferQueue.tryPublish(tm)) {
                return true;
            }
        } else {
            LOG.debug("Noticed Back Pressure in remote task {}", addressedTuple.dest);
        }
        if (pendingEmits != null) {
            pendingEmits.add(addressedTuple);
        }
        return false;
    }
  • 这里可以看到创建TaskMessage的时候,使用serializer.serialize(addressedTuple.getTuple())对tuple进行了序列化;该serializer为ITupleSerializer类型,它的实现类为KryoTupleSerializer

KryoTupleSerializer

storm-2.0.0/storm-client/src/jvm/org/apache/storm/serialization/KryoTupleSerializer.java

public class KryoTupleSerializer implements ITupleSerializer {
    KryoValuesSerializer _kryo;
    SerializationFactory.IdDictionary _ids;
    Output _kryoOut;

    public KryoTupleSerializer(final Map<String, Object> conf, final GeneralTopologyContext context) {
        _kryo = new KryoValuesSerializer(conf);
        _kryoOut = new Output(2000, 2000000000);
        _ids = new SerializationFactory.IdDictionary(context.getRawTopology());
    }

    public byte[] serialize(Tuple tuple) {
        try {

            _kryoOut.clear();
            _kryoOut.writeInt(tuple.getSourceTask(), true);
            _kryoOut.writeInt(_ids.getStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId()), true);
            tuple.getMessageId().serialize(_kryoOut);
            _kryo.serializeInto(tuple.getValues(), _kryoOut);
            return _kryoOut.toBytes();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    //    public long crc32(Tuple tuple) {
    //        try {
    //            CRC32OutputStream hasher = new CRC32OutputStream();
    //            _kryo.serializeInto(tuple.getValues(), hasher);
    //            return hasher.getValue();
    //        } catch (IOException e) {
    //            throw new RuntimeException(e);
    //        }
    //    }
}
  • KryoTupleSerializer创建了KryoValuesSerializer,在serialize tuple的时候调用了_kryo.serializeInto(tuple.getValues(), _kryoOut)

KryoValuesSerializer

storm-2.0.0/storm-client/src/jvm/org/apache/storm/serialization/KryoValuesSerializer.java

public class KryoValuesSerializer {
    Kryo _kryo;
    ListDelegate _delegate;
    Output _kryoOut;

    public KryoValuesSerializer(Map<String, Object> conf) {
        _kryo = SerializationFactory.getKryo(conf);
        _delegate = new ListDelegate();
        _kryoOut = new Output(2000, 2000000000);
    }

    public void serializeInto(List<Object> values, Output out) {
        // this ensures that list of values is always written the same way, regardless
        // of whether it's a java collection or one of clojure's persistent collections 
        // (which have different serializers)
        // Doing this lets us deserialize as ArrayList and avoid writing the class here
        _delegate.setDelegate(values);
        _kryo.writeObject(out, _delegate);
    }

    public byte[] serialize(List<Object> values) {
        _kryoOut.clear();
        serializeInto(values, _kryoOut);
        return _kryoOut.toBytes();
    }

    public byte[] serializeObject(Object obj) {
        _kryoOut.clear();
        _kryo.writeClassAndObject(_kryoOut, obj);
        return _kryoOut.toBytes();
    }
}
  • KryoValuesSerializer在构造器里头调用SerializationFactory.getKryo(conf)方法创建_kryo
  • 这里的_delegate使用的是ListDelegate(即用它来包装一下List<Object> values),_kryoOut为new Output(2000, 2000000000)
  • serialize方法调用的是serializeInto方法,该方法最后调用的是原生的_kryo.writeObject方法进行序列化

SerializationFactory.getKryo

storm-2.0.0/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java

    public static Kryo getKryo(Map<String, Object> conf) {
        IKryoFactory kryoFactory = (IKryoFactory) ReflectionUtils.newInstance((String) conf.get(Config.TOPOLOGY_KRYO_FACTORY));
        Kryo k = kryoFactory.getKryo(conf);
        k.register(byte[].class);

        /* tuple payload serializer is specified via configuration */
        String payloadSerializerName = (String) conf.get(Config.TOPOLOGY_TUPLE_SERIALIZER);
        try {
            Class serializerClass = Class.forName(payloadSerializerName);
            Serializer serializer = resolveSerializerInstance(k, ListDelegate.class, serializerClass, conf);
            k.register(ListDelegate.class, serializer);
        } catch (ClassNotFoundException ex) {
            throw new RuntimeException(ex);
        }

        k.register(ArrayList.class, new ArrayListSerializer());
        k.register(HashMap.class, new HashMapSerializer());
        k.register(HashSet.class, new HashSetSerializer());
        k.register(BigInteger.class, new BigIntegerSerializer());
        k.register(TransactionAttempt.class);
        k.register(Values.class);
        k.register(org.apache.storm.metric.api.IMetricsConsumer.DataPoint.class);
        k.register(org.apache.storm.metric.api.IMetricsConsumer.TaskInfo.class);
        k.register(ConsList.class);
        k.register(BackPressureStatus.class);

        synchronized (loader) {
            for (SerializationRegister sr : loader) {
                try {
                    sr.register(k);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }

        kryoFactory.preRegister(k, conf);

        boolean skipMissing = (Boolean) conf.get(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS);

        register(k, conf.get(Config.TOPOLOGY_KRYO_REGISTER), conf, skipMissing);

        kryoFactory.postRegister(k, conf);

        if (conf.get(Config.TOPOLOGY_KRYO_DECORATORS) != null) {
            for (String klassName : (List<String>) conf.get(Config.TOPOLOGY_KRYO_DECORATORS)) {
                try {
                    Class klass = Class.forName(klassName);
                    IKryoDecorator decorator = (IKryoDecorator) klass.newInstance();
                    decorator.decorate(k);
                } catch (ClassNotFoundException e) {
                    if (skipMissing) {
                        LOG.info("Could not find kryo decorator named " + klassName + ". Skipping registration...");
                    } else {
                        throw new RuntimeException(e);
                    }
                } catch (InstantiationException e) {
                    throw new RuntimeException(e);
                } catch (IllegalAccessException e) {
                    throw new RuntimeException(e);
                }
            }
        }

        kryoFactory.postDecorate(k, conf);

        return k;
    }

    public static void register(Kryo k, Object kryoRegistrations, Map<String, Object> conf, boolean skipMissing) {
        Map<String, String> registrations = normalizeKryoRegister(kryoRegistrations);
        for (Map.Entry<String, String> entry : registrations.entrySet()) {
            String serializerClassName = entry.getValue();
            try {
                Class klass = Class.forName(entry.getKey());
                Class serializerClass = null;
                if (serializerClassName != null) {
                    serializerClass = Class.forName(serializerClassName);
                }
                if (serializerClass == null) {
                    k.register(klass);
                } else {
                    k.register(klass, resolveSerializerInstance(k, klass, serializerClass, conf));
                }
            } catch (ClassNotFoundException e) {
                if (skipMissing) {
                    LOG.info("Could not find serialization or class for " + serializerClassName + ". Skipping registration...");
                } else {
                    throw new RuntimeException(e);
                }
            }
        }
    }
  • SerializationFactory.getKryo静态方法首先根据Config.TOPOLOGY_KRYO_FACTORY创建IKryoFactory,默认是org.apache.storm.serialization.DefaultKryoFactory
  • 之后通过IKryoFactory.getKryo创建Kryo,之后就是对Kryo进行一系列配置,这里注册了byte[].class、ListDelegate.class、ArrayList.class、HashMap.class、HashSet.class、BigInteger.class、TransactionAttempt.class、Values.class、org.apache.storm.metric.api.IMetricsConsumer.DataPoint.class、org.apache.storm.metric.api.IMetricsConsumer.TaskInfo.class、ConsList.class、BackPressureStatus.class
  • ListDelegate.class为payload的容器,采用Config.TOPOLOGY_TUPLE_SERIALIZER(topology.tuple.serializer,默认是org.apache.storm.serialization.types.ListDelegateSerializer)配置的类进行序列化
  • Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS(topology.skip.missing.kryo.registrations,默认为false),当kryo找不到配置的要序列化的class对应serializers的时候,是抛出异常还是直接跳过注册;
  • 最后通过Config.TOPOLOGY_KRYO_DECORATORS(topology.kryo.decorators)加载自定义的serialization

DefaultKryoFactory

storm-2.0.0/storm-client/src/jvm/org/apache/storm/serialization/DefaultKryoFactory.java

public class DefaultKryoFactory implements IKryoFactory {

    @Override
    public Kryo getKryo(Map<String, Object> conf) {
        KryoSerializableDefault k = new KryoSerializableDefault();
        k.setRegistrationRequired(!((Boolean) conf.get(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION)));
        k.setReferences(false);
        return k;
    }

    @Override
    public void preRegister(Kryo k, Map<String, Object> conf) {
    }

    public void postRegister(Kryo k, Map<String, Object> conf) {
        ((KryoSerializableDefault) k).overrideDefault(true);
    }

    @Override
    public void postDecorate(Kryo k, Map<String, Object> conf) {
    }

    public static class KryoSerializableDefault extends Kryo {
        boolean _override = false;

        public void overrideDefault(boolean value) {
            _override = value;
        }

        @Override
        public Serializer getDefaultSerializer(Class type) {
            if (_override) {
                return new SerializableSerializer();
            } else {
                return super.getDefaultSerializer(type);
            }
        }
    }
}
  • 这里从配置读取Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION(topology.fall.back.on.java.serialization),默认该值为true,则registrationRequired这里设置为false,即序列化的时候不要求该class必须在已注册的列表中

Kryo

kryo-4.0.2-sources.jar!/com/esotericsoftware/kryo/Kryo.java

    /** If the class is not registered and {@link Kryo#setRegistrationRequired(boolean)} is false, it is automatically registered
     * using the {@link Kryo#addDefaultSerializer(Class, Class) default serializer}.
     * @throws IllegalArgumentException if the class is not registered and {@link Kryo#setRegistrationRequired(boolean)} is true.
     * @see ClassResolver#getRegistration(Class) */
    public Registration getRegistration (Class type) {
        if (type == null) throw new IllegalArgumentException("type cannot be null.");

        Registration registration = classResolver.getRegistration(type);
        if (registration == null) {
            if (Proxy.isProxyClass(type)) {
                // If a Proxy class, treat it like an InvocationHandler because the concrete class for a proxy is generated.
                registration = getRegistration(InvocationHandler.class);
            } else if (!type.isEnum() && Enum.class.isAssignableFrom(type) && !Enum.class.equals(type)) {
                // This handles an enum value that is an inner class. Eg: enum A {b{}};
                registration = getRegistration(type.getEnclosingClass());
            } else if (EnumSet.class.isAssignableFrom(type)) {
                registration = classResolver.getRegistration(EnumSet.class);
            } else if (isClosure(type)) {
                registration = classResolver.getRegistration(ClosureSerializer.Closure.class);
            }
            if (registration == null) {
                if (registrationRequired) {
                    throw new IllegalArgumentException(unregisteredClassMessage(type));
                }
                if (warnUnregisteredClasses) {
                    warn(unregisteredClassMessage(type));
                }
                registration = classResolver.registerImplicit(type);
            }
        }
        return registration;
    }

    /** Registers the class using the lowest, next available integer ID and the {@link Kryo#getDefaultSerializer(Class) default
     * serializer}. If the class is already registered, no change will be made and the existing registration will be returned.
     * Registering a primitive also affects the corresponding primitive wrapper.
     * <p>
     * Because the ID assigned is affected by the IDs registered before it, the order classes are registered is important when
     * using this method. The order must be the same at deserialization as it was for serialization. */
    public Registration register (Class type) {
        Registration registration = classResolver.getRegistration(type);
        if (registration != null) return registration;
        return register(type, getDefaultSerializer(type));
    }

    /** Returns the best matching serializer for a class. This method can be overridden to implement custom logic to choose a
     * serializer. */
    public Serializer getDefaultSerializer (Class type) {
        if (type == null) throw new IllegalArgumentException("type cannot be null.");

        final Serializer serializerForAnnotation = getDefaultSerializerForAnnotatedType(type);
        if (serializerForAnnotation != null) return serializerForAnnotation;

        for (int i = 0, n = defaultSerializers.size(); i < n; i++) {
            DefaultSerializerEntry entry = defaultSerializers.get(i);
            if (entry.type.isAssignableFrom(type)) {
                Serializer defaultSerializer = entry.serializerFactory.makeSerializer(this, type);
                return defaultSerializer;
            }
        }

        return newDefaultSerializer(type);
    }

    /** Called by {@link #getDefaultSerializer(Class)} when no default serializers matched the type. Subclasses can override this
     * method to customize behavior. The default implementation calls {@link SerializerFactory#makeSerializer(Kryo, Class)} using
     * the {@link #setDefaultSerializer(Class) default serializer}. */
    protected Serializer newDefaultSerializer (Class type) {
        return defaultSerializer.makeSerializer(this, type);
    }

    /** Registers the class using the lowest, next available integer ID and the specified serializer. If the class is already
     * registered, the existing entry is updated with the new serializer. Registering a primitive also affects the corresponding
     * primitive wrapper.
     * <p>
     * Because the ID assigned is affected by the IDs registered before it, the order classes are registered is important when
     * using this method. The order must be the same at deserialization as it was for serialization. */
    public Registration register (Class type, Serializer serializer) {
        Registration registration = classResolver.getRegistration(type);
        if (registration != null) {
            registration.setSerializer(serializer);
            return registration;
        }
        return classResolver.register(new Registration(type, serializer, getNextRegistrationId()));
    }

    /** Returns the lowest, next available integer ID. */
    public int getNextRegistrationId () {
        while (nextRegisterID != -2) {
            if (classResolver.getRegistration(nextRegisterID) == null) return nextRegisterID;
            nextRegisterID++;
        }
        throw new KryoException("No registration IDs are available.");
    }
  • Kryo的getRegistration方法,当遇到class没有注册时会判断registrationRequired,如果为true,则抛出IllegalArgumentException;如果为false,则调用classResolver.registerImplicit进行隐式注册,同时如果warnUnregisteredClasses为true则会打印warning信息
  • Kryo的register方法如果没有指定Serializer时,会通过getDefaultSerializer获取最匹配的Serializer,如果从已经注册的defaultSerializers没匹配到,则调用newDefaultSerializer创建一个,这里可能存在无法创建的异常,会抛出IllegalArgumentException
  • register(Class type, Serializer serializer)方法最后是调用ClassResolver.register(Registration registration)方法,对于没有Registration的,这里new了一个,同时通过getNextRegistrationId,给Registration分配一个id

DefaultClassResolver.register

kryo-4.0.2-sources.jar!/com/esotericsoftware/kryo/util/DefaultClassResolver.java

    static public final byte NAME = -1;

    protected final IntMap<Registration> idToRegistration = new IntMap();
    protected final ObjectMap<Class, Registration> classToRegistration = new ObjectMap();
    protected IdentityObjectIntMap<Class> classToNameId;

    public Registration registerImplicit (Class type) {
        return register(new Registration(type, kryo.getDefaultSerializer(type), NAME));
    }

    public Registration register (Registration registration) {
        if (registration == null) throw new IllegalArgumentException("registration cannot be null.");
        if (registration.getId() != NAME) {
            if (TRACE) {
                trace("kryo", "Register class ID " + registration.getId() + ": " + className(registration.getType()) + " ("
                    + registration.getSerializer().getClass().getName() + ")");
            }
            idToRegistration.put(registration.getId(), registration);
        } else if (TRACE) {
            trace("kryo", "Register class name: " + className(registration.getType()) + " ("
                + registration.getSerializer().getClass().getName() + ")");
        }
        classToRegistration.put(registration.getType(), registration);
        if (registration.getType().isPrimitive()) classToRegistration.put(getWrapperClass(registration.getType()), registration);
        return registration;
    }

    public Registration writeClass (Output output, Class type) {
        if (type == null) {
            if (TRACE || (DEBUG && kryo.getDepth() == 1)) log("Write", null);
            output.writeVarInt(Kryo.NULL, true);
            return null;
        }
        Registration registration = kryo.getRegistration(type);
        if (registration.getId() == NAME)
            writeName(output, type, registration);
        else {
            if (TRACE) trace("kryo", "Write class " + registration.getId() + ": " + className(type));
            output.writeVarInt(registration.getId() + 2, true);
        }
        return registration;
    }

    protected void writeName (Output output, Class type, Registration registration) {
        output.writeVarInt(NAME + 2, true);
        if (classToNameId != null) {
            int nameId = classToNameId.get(type, -1);
            if (nameId != -1) {
                if (TRACE) trace("kryo", "Write class name reference " + nameId + ": " + className(type));
                output.writeVarInt(nameId, true);
                return;
            }
        }
        // Only write the class name the first time encountered in object graph.
        if (TRACE) trace("kryo", "Write class name: " + className(type));
        int nameId = nextNameId++;
        if (classToNameId == null) classToNameId = new IdentityObjectIntMap();
        classToNameId.put(type, nameId);
        output.writeVarInt(nameId, true);
        output.writeString(type.getName());
    }

    public void reset () {
        if (!kryo.isRegistrationRequired()) {
            if (classToNameId != null) classToNameId.clear(2048);
            if (nameIdToClass != null) nameIdToClass.clear();
            nextNameId = 0;
        }
    }
  • DefaultClassResolver.register(Registration registration)方法里头针对registration的id进行了判断,如果是NAME(这里用-1表示)则注册到ObjectMap<Class, Registration> classToRegistration,如果有id不是NAME的,则注册到IntMap<Registration> idToRegistration
  • 前面提到如果registrationRequired是false,则调用classResolver.registerImplicit进行隐式注册,这里可以看到registerImplicit注册的registration的id是NAME
  • registration的id是NAME与否具体在writeClass中有体现(如果要序列化的类的字段中不仅仅有基本类型,还有未注册的类,会调用这里的writeClass方法),从代码可以看到如果是NAME,则使用的是writeName;不是NAME的则直接使用output.writeVarInt(registration.getId() + 2, true),写入int;writeName方法第一次遇到NAME的class时会给它生成一个nameId,然后放入到IdentityObjectIntMap<Class> classToNameId中,然后写入int,再写入class.getName,第二次遇到该class的时候,由于classToNameId中已经存在nameId,因而直接写入int;但是DefaultClassResolver的reset方法在registrationRequired是false这种情况下会调用classToNameId.clear(2048),进行清空或者resize,这个时候一旦这个方法被调用,那么下次可能无法利用classToNameId用id替代className来序列化。

Kryo.writeObject

kryo-4.0.2-sources.jar!/com/esotericsoftware/kryo/Kryo.java

    /** Writes an object using the registered serializer. */
    public void writeObject (Output output, Object object) {
        if (output == null) throw new IllegalArgumentException("output cannot be null.");
        if (object == null) throw new IllegalArgumentException("object cannot be null.");
        beginObject();
        try {
            if (references && writeReferenceOrNull(output, object, false)) {
                getRegistration(object.getClass()).getSerializer().setGenerics(this, null);
                return;
            }
            if (TRACE || (DEBUG && depth == 1)) log("Write", object);
            getRegistration(object.getClass()).getSerializer().write(this, output, object);
        } finally {
            if (--depth == 0 && autoReset) reset();
        }
    }

    /** Resets unregistered class names, references to previously serialized or deserialized objects, and the
     * {@link #getGraphContext() graph context}. If {@link #setAutoReset(boolean) auto reset} is true, this method is called
     * automatically when an object graph has been completely serialized or deserialized. If overridden, the super method must be
     * called. */
    public void reset () {
        depth = 0;
        if (graphContext != null) graphContext.clear();
        classResolver.reset();
        if (references) {
            referenceResolver.reset();
            readObject = null;
        }

        copyDepth = 0;
        if (originalToCopy != null) originalToCopy.clear(2048);

        if (TRACE) trace("kryo", "Object graph complete.");
    }
  • 这里要注意一下,writeObject方法在finally的时候判断如果depth为0且autoReset为true,会调用reset方法;而reset方法会调用classResolver.reset(),清空nameIdToClass以及classToNameId(classToNameId.clear(2048))

小结

  • storm默认是用kryo来进行tuple的序列化,storm额外注册了byte[].class、ListDelegate.class、ArrayList.class、HashMap.class、HashSet.class、BigInteger.class、TransactionAttempt.class、Values.class、org.apache.storm.metric.api.IMetricsConsumer.DataPoint.class、org.apache.storm.metric.api.IMetricsConsumer.TaskInfo.class、ConsList.class、BackPressureStatus.class等类型
  • Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION(topology.fall.back.on.java.serialization)如果为true,则kryo.setRegistrationRequired(false),也就是如果一个class没有在kryo进行注册,不会抛异常;这个命名可能存在歧义(不是使用java自身的序列化机制来进行fallback),它实际上要表达的是对于遇到没有注册的class要不要fallback,如果不fallback则直接抛异常,如果fallback,则会进行隐式注册,在classToNameId不会被reset的前提下,第一次使用className来序列化,同时分配一个id写入classToNameId,第二次则直接使用classToNameId中获取到的id,也就相当于手工注册的效果
  • Config.TOPOLOGY_TUPLE_SERIALIZER(topology.tuple.serializer,默认是org.apache.storm.serialization.types.ListDelegateSerializer)用于配置tuple的payload的序列化类
  • Config.TOPOLOGY_KRYO_DECORATORS(topology.kryo.decorators)用于加载自定义的serialization,可以直接通过Config.registerDecorator注册一个IKryoDecorator,在decorate方法中对Kyro注册要序列化的class
  • Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS(topology.skip.missing.kryo.registrations,默认为false)这个属性容易跟Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION(topology.fall.back.on.java.serialization)混淆起来,前者是storm自身的属性而后者storm包装的kryo的属性(registrationRequired);Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS配置的是在有自定义Config.TOPOLOGY_KRYO_DECORATORS的场景下,如果storm加载不到用户自定义的IKryoDecorator类时是skip还是抛异常
  • Kryo的registrationRequired为false的话,则会自动对未注册的class进行隐式注册(注册到classToNameId),只在第一次序列化的时候使用className,之后都用id替代,来节省空间;不过要注意的是如果Kryo的autoReset为true的话,那么classToNameId会被reset,因而隐式注册在非第一次遇到未注册的class的时候并不能一直走使用id代替className来序列化

doc

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,594评论 18 139
  • Date: Nov 17-24, 2017 1. 目的 积累Storm为主的流式大数据处理平台对实时数据处理的相关...
    一只很努力爬树的猫阅读 2,156评论 0 4
  • 本文主要介绍storm中的基本概念,从基础上了解strom的体系结构,便于后续编程过程中作为基础指导。主要的概念包...
    看山远兮阅读 1,501评论 0 9
  • 015年03月19日 23:28:19 本文主要是参照strom的管网中的document中来进行安装,管网地址 ...
    AIOPstack阅读 775评论 0 1
  • 造型,与其说鬼斧神工,不如説仁者见仁智者见智。李健吾《雨中登泰山》:“一般庙宇的塑像,往往不是平板,就是怪诞,造型...
    书香剑气巴士阅读 550评论 1 3