spark序列化和反序列化对象的实现,java版本

1.定义一个项目

mvn archetype:generate -DgroupId=com.packt.samples -DartifactId=objectstream -Dversion=1.0.0 -DinteractiveMode=false -DarchetypeCatalog=internal

2.修改pom文件如下

[root@localhost objectstream]# cat pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

  <modelVersion>4.0.0</modelVersion>

  <groupId>com.packt.samples</groupId>

  <artifactId>objectstream</artifactId>

  <packaging>jar</packaging>

  <version>1.0.0</version>

  <name>objectstream</name>

  <url>http://maven.apache.org</url>

  <dependencies>

    <dependency>

      <groupId>junit</groupId>

      <artifactId>junit</artifactId>

      <version>3.8.1</version>

      <scope>test</scope>

    </dependency>

  </dependencies>

<build>

  <plugins>

    <plugin>   

        <groupId>org.apache.maven.plugins</groupId>   

        <artifactId>maven-compiler-plugin</artifactId>   

        <configuration>   

            <source>1.8</source>   

            <target>1.8</target>   

        </configuration>   

    </plugin> 

    <plugin>

      <groupId>org.apache.maven.plugins</groupId>

      <artifactId>maven-assembly-plugin</artifactId>

      <version>2.4</version>

      <configuration>

        <descriptorRefs>

          <descriptorRef>jar-with-dependencies</descriptorRef>

        </descriptorRefs>

        <archive>

          <manifest>

            <addClasspath>true</addClasspath>

            <mainClass>com.packt.samples.App</mainClass>

          </manifest>

        </archive>

      </configuration>

      <executions>

        <execution>

          <id>assemble-all</id>

          <phase>package</phase>

          <goals>

            <goal>single</goal>

          </goals>

        </execution>

      </executions>

    </plugin>

  </plugins>

</build>

</project>

[root@localhost objectstream]#

3.入口类是 App

[root@localhost objectstream]# cat src/main/java/com/packt/samples/App.java

package com.packt.samples;

import java.nio.ByteBuffer;

/**

* Hello world!

*

*/

public class App

{

    public static void main( String[] args )

    {

        JavaSerializer ser = new JavaSerializer();

        SerializerInstance<Task> jsi = ser.newInstance();

        Task ti = new Task(1);

        ByteBuffer buf = jsi.serialize(ti);

        Task dd = (Task) jsi.deserialize(buf,Thread.currentThread().getContextClassLoader());

        System.out.println(dd);

    }

}

4.定义序列化类 JavaSerializer

package com.packt.samples;

public class JavaSerializer {

  public JavaSerializer() {

  }

  public SerializerInstance newInstance() {

    return new JavaSerializerInstance(100,true,Thread.currentThread().getContextClassLoader());

  }

}

5.负责真正序列化和反序列化的JavaSerializerInstance类

[root@localhost objectstream]# cat src/main/java/com/packt/samples/JavaSerializerInstance.java

package com.packt.samples;

import java.nio.ByteBuffer;

import java.io.OutputStream;

import java.io.InputStream;

public class JavaSerializerInstance<T> extends SerializerInstance<T> {

  public int counterReset;

  public boolean extraDebugInfo;

  public ClassLoader defaultClassLoader;

  public JavaSerializerInstance(int counterReset,boolean extraDebugInfo,ClassLoader defaultClassLoader) {

    this.counterReset = counterReset;

    this.extraDebugInfo = extraDebugInfo;

    this.defaultClassLoader = defaultClassLoader;

  }

  @Override

  public ByteBuffer serialize(T t) {

    ByteBufferOutputStream bos = new ByteBufferOutputStream();

    SerializationStream out = serializeStream(bos);

    out.writeObject(t);

    out.close();

    return bos.toByteBuffer();

  }

  @Override

  public T deserialize(ByteBuffer bytes) {

    ByteBufferInputStream bis = new ByteBufferInputStream(bytes);

    DeserializationStream<T> in = deserializeStream(bis);

    return (T) in.readObject();

  }

  @Override

  public T deserialize(ByteBuffer bytes, ClassLoader loader) {

    ByteBufferInputStream bis = new ByteBufferInputStream(bytes);

    DeserializationStream<T> in = deserializeStream(bis, loader);

    return in.readObject();

  }

  @Override

  public SerializationStream serializeStream(OutputStream s) {

    return new JavaSerializationStream<T>(s, counterReset, extraDebugInfo);

  }

  @Override

  public DeserializationStream deserializeStream(InputStream s) {

    return new JavaDeserializationStream<T>(s, defaultClassLoader);

  }

  public DeserializationStream deserializeStream(InputStream s, ClassLoader loader) {

    return new JavaDeserializationStream<T>(s, loader);

  }

}

6.读写相关的字节流对象

[root@localhost objectstream]# cat src/main/java/com/packt/samples/ByteBufferOutputStream.java

package com.packt.samples;

import java.io.ByteArrayOutputStream;

import java.io.IOException;

import java.nio.ByteBuffer;

class ByteBufferOutputStream extends ByteArrayOutputStream {

  public int capacity;

  public boolean closed = false;

  public ByteBufferOutputStream() {

    this(32);

  }

  public ByteBufferOutputStream(int capacity) {

    super(capacity);

  }

  public int getCount() {

    return count;

  }


  @Override

  public void write(int b) {

    super.write(b);

  }

  @Override

  public void write(byte[] b, int off, int len) {

    super.write(b, off, len);

  }

  @Override

  public void reset() {

    super.reset();

  }

  @Override

  public void close() {

    if (!closed) {

      try {

        super.close();

      } catch(IOException e) {

        System.out.println(e);

      }

      closed = true;

    }

  }

  public ByteBuffer toByteBuffer() {

    return ByteBuffer.wrap(buf, 0, count);

  }

}

[root@localhost objectstream]# cat src/main/java/com/packt/samples/ByteBufferInputStream.java

package com.packt.samples;

import java.nio.ByteBuffer;

import java.io.InputStream;

class ByteBufferInputStream extends InputStream {

  private ByteBuffer buffer;

  public ByteBufferInputStream(ByteBuffer buffer) {

    this.buffer = buffer;

  }

  @Override

  public int read() {

    if (buffer == null || buffer.remaining() == 0) {

      cleanUp();

      return -1;

    } else {

      return buffer.get() & 0xFF;

    }

  }

  @Override

  public int read(byte[] dest) {

    return read(dest, 0, dest.length);

  }

  @Override

  public int read(byte[] dest, int offset, int length) {

    if (buffer == null || buffer.remaining() == 0) {

      cleanUp();

      return -1;

    } else {

      int amountToGet = Math.min(buffer.remaining(), length);

      buffer.get(dest, offset, amountToGet);

      return amountToGet;

    }

  }

  @Override

  public long skip(long bytes) {

    if (buffer != null) {

      long amountToSkip =  Math.min(bytes, new Integer(buffer.remaining()).longValue());

      buffer.position(buffer.position() + new Long(amountToSkip).intValue());

      if (buffer.remaining() == 0) {

        cleanUp();

      }

      return amountToSkip;

    } else {

      return 0L;

    }

  }

  private void cleanUp() {

    if (buffer != null) {

      buffer = null;

    }

  }

}

6.对输入和输出流进行抽象的两个抽象类

[root@localhost objectstream]# cat src/main/java/com/packt/samples/DeserializationStream.java

package com.packt.samples;

import java.util.Iterator;

import java.io.Closeable;

import java.io.IOException;

import java.io.EOFException;

abstract class DeserializationStream<T> implements Closeable {

  abstract public T readObject();

  public T readKey() {

    return readObject();

  }

  public T readValue() {

    return readObject();

  }

  @Override

  abstract public void close();

  public Iterator<T> asIterator() {

    return new NextIterator<T>() {

      @Override

      protected T getNext() {

        T t = null;

        t = readObject();

        return t;

      }

      @Override

      protected void close() {

        DeserializationStream.this.close();

      }

    };

  }

  public Iterator<ObjectPair> asKeyValueIterator() {

    return new NextIterator<ObjectPair>() {

      @Override

      protected ObjectPair getNext() {

        ObjectPair op = null;

        op = new ObjectPair(readKey(), readValue());

        return op;

      }

      @Override

      protected void close() {

        DeserializationStream.this.close();

      }

    };

  }

}

[root@localhost objectstream]# cat src/main/java/com/packt/samples/SerializationStream.java

package com.packt.samples;

import java.util.Iterator;

import java.io.Closeable;

abstract class SerializationStream<T> implements Closeable {


  public abstract SerializationStream writeObject(T t);

  public SerializationStream writeKey(T key) {

    return writeObject(key);

  }


  public SerializationStream writeValue(T value) {

    return writeObject(value);

  }

  public abstract void flush();

  @Override

  public abstract void close();

  public SerializationStream writeAll(Iterator<T> iter) {

    while (iter.hasNext()) {

      writeObject(iter.next());

    }

    return this;

  }

}

7.两个抽象类的实现

[root@localhost objectstream]# cat src/main/java/com/packt/samples/JavaDeserializationStream.java

package com.packt.samples;

import java.io.InputStream;

import java.io.ObjectInputStream;

import java.io.ObjectStreamClass;

import java.io.IOException;

import java.util.Map;

import java.util.HashMap;

public class JavaDeserializationStream<T> extends DeserializationStream<T> {

  public InputStream in;

  public ClassLoader loader;

  public ObjectInputStream objIn;

  public Map<String,Class> primitiveMappings;

  public JavaDeserializationStream(InputStream in, ClassLoader loader) {

    this.in = in;

    this.loader = loader;

    try {

      this.objIn = new ObjectInputStream(in) {

        @Override

        public Class resolveClass(ObjectStreamClass desc) {

          Class cls = null;

          try {

            cls = Class.forName(desc.getName(), false, loader);

          } catch(ClassNotFoundException e) {

            System.out.println(e);

          }

          return cls;

        }

      };

    } catch(IOException e) {

      System.out.println(e);

    }

    primitiveMappings = new HashMap<String,Class>();

    primitiveMappings.put("boolean", Boolean.class);

    primitiveMappings.put("byte", Byte.class);

    primitiveMappings.put("char", Character.class);

    primitiveMappings.put("short", Short.class);

    primitiveMappings.put("int", Integer.class);

    primitiveMappings.put("long", Long.class);

    primitiveMappings.put("float", Float.class);

    primitiveMappings.put("double", Double.class);

    primitiveMappings.put("void", Void.class);

  }

  public T readObject() {

    T t = null;

    try {

      t = (T) objIn.readObject();

    } catch(IOException e) {

      System.out.println(e);

    } catch(ClassNotFoundException e) {

      System.out.println(e);

    }

    return t;

  }


  public void close() {

    try {

      objIn.close();

    } catch(IOException e) {

      System.out.println(e);

    }

  }

}

[root@localhost objectstream]# cat src/main/java/com/packt/samples/JavaSerializationStream.java package com.packt.samples;

import java.io.OutputStream;

import java.io.ObjectOutputStream;

import java.io.NotSerializableException;

import java.io.IOException;

class JavaSerializationStream<T> extends SerializationStream<T> {

  public OutputStream out;

  public int counterReset,counter;

  public boolean extraDebugInfo;

  public ObjectOutputStream objOut;

  public JavaSerializationStream(OutputStream out,int counterReset,boolean extraDebugInfo) {

    this.out = out;

    this.counterReset = counterReset;

    this.extraDebugInfo = extraDebugInfo;

    try {

      objOut = new ObjectOutputStream(out);

    } catch(IOException e) {

      System.out.println(e);

    }

    counter = 0;

  }

  public SerializationStream writeObject(T t) {

    try {

      objOut.writeObject(t);

    } catch(NotSerializableException e) {

      System.out.println(e);

    } catch(IOException e) {

      System.out.println(e);

    }

    counter += 1;

    if (counterReset > 0 && counter >= counterReset) {

      try {

        objOut.reset();

      } catch(IOException e) {

        System.out.println(e);

      }

      counter = 0;

    }

    return this;

  }

  public void flush() {

    try {

      objOut.flush();

    } catch(IOException e) {

      System.out.println(e);

    }

  }

  public void close() {

    try {

      objOut.close();

    } catch(IOException e) {

      System.out.println(e);

    }

  }

}

8.用来保存key 和 value的 类

[root@localhost objectstream]# cat src/main/java/com/packt/samples/ObjectPair.java

package com.packt.samples;

class ObjectPair<T> {

    private T key,value;

    public ObjectPair(T key,T value) {

        this.key = key;

        this.value = value;

    }

}

9.用于测试序列化的类

[root@localhost objectstream]# cat src/main/java/com/packt/samples/Task.java

package com.packt.samples;

import java.io.Serializable;

public class Task implements Serializable {

  public int id;

  public Task(int id) {

    this.id = id;

  }

  @Override

  public String toString() {

      return (new Integer(id)).toString();

  }

}

10.迭代器对象

[root@localhost objectstream]# cat src/main/java/com/packt/samples/NextIterator.java

package com.packt.samples;

import java.util.Iterator;

import java.util.NoSuchElementException;

abstract class NextIterator<U> implements Iterator<U> {

  private boolean gotNext = false;

  private U nextValue;

  private boolean closed = false;

  protected boolean finished = false;

  abstract protected U getNext();

  abstract protected void close();

  public void closeIfNeeded() {

    if (!closed) {

      closed = true;

      close();

    }

  }

  @Override

  public boolean hasNext() {

    if (!finished) {

      if (!gotNext) {

        nextValue = getNext();

        if (finished) {

          closeIfNeeded();

        }

        gotNext = true;

      }

    }

    return !finished;

  }

  @Override

  public U next() {

    if (!hasNext()) {

      throw new NoSuchElementException("End of stream");

    }

    gotNext = false;

    return nextValue;

  }

}


11.编译

mvn clean package -DskipTests

12.执行

[root@localhost objectstream]# java -jar target/objectstream-1.0.0-jar-with-dependencies.jar

1

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,163评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,301评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,089评论 0 352
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,093评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,110评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,079评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,005评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,840评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,278评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,497评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,394评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,980评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,628评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,649评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,548评论 2 352

推荐阅读更多精彩内容