Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F8394221
payload_codec.go
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
2 KB
Subscribers
None
payload_codec.go
View Options
/*
* Copyright (c) 2011 NeuStar, Inc.
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* NeuStar, the Neustar logo and related names and logos are registered
* trademarks, service marks or tradenames of NeuStar, Inc. All other
* product names, company names, marks, logos and symbols may be trademarks
* of their respective owners.
*/
package
kafka
import
(
"bytes"
"compress/gzip"
// "log"
)
const
(
NO_COMPRESSION_ID
=
0
GZIP_COMPRESSION_ID
=
1
)
type
PayloadCodec
interface
{
// the 1 byte id of the codec
Id
()
byte
// encoder interface for compression implementation
Encode
(
data
[]
byte
)
[]
byte
// decoder interface for decompression implementation
Decode
(
data
[]
byte
)
[]
byte
}
// Default Codecs
var
DefaultCodecs
=
[]
PayloadCodec
{
new
(
NoCompressionPayloadCodec
),
new
(
GzipPayloadCodec
),
}
var
DefaultCodecsMap
=
codecsMap
(
DefaultCodecs
)
func
codecsMap
(
payloadCodecs
[]
PayloadCodec
)
map
[
byte
]
PayloadCodec
{
payloadCodecsMap
:=
make
(
map
[
byte
]
PayloadCodec
,
len
(
payloadCodecs
))
for
_
,
c
:=
range
payloadCodecs
{
payloadCodecsMap
[
c
.
Id
()]
=
c
}
return
payloadCodecsMap
}
// No compression codec, noop
type
NoCompressionPayloadCodec
struct
{
}
func
(
codec
*
NoCompressionPayloadCodec
)
Id
()
byte
{
return
NO_COMPRESSION_ID
}
func
(
codec
*
NoCompressionPayloadCodec
)
Encode
(
data
[]
byte
)
[]
byte
{
return
data
}
func
(
codec
*
NoCompressionPayloadCodec
)
Decode
(
data
[]
byte
)
[]
byte
{
return
data
}
// Gzip Codec
type
GzipPayloadCodec
struct
{
}
func
(
codec
*
GzipPayloadCodec
)
Id
()
byte
{
return
GZIP_COMPRESSION_ID
}
func
(
codec
*
GzipPayloadCodec
)
Encode
(
data
[]
byte
)
[]
byte
{
buf
:=
bytes
.
NewBuffer
([]
byte
{})
zipper
,
_
:=
gzip
.
NewWriterLevel
(
buf
,
gzip
.
BestSpeed
)
zipper
.
Write
(
data
)
zipper
.
Close
()
return
buf
.
Bytes
()
}
func
(
codec
*
GzipPayloadCodec
)
Decode
(
data
[]
byte
)
[]
byte
{
buf
:=
bytes
.
NewBuffer
([]
byte
{})
zipper
,
_
:=
gzip
.
NewReader
(
bytes
.
NewBuffer
(
data
))
unzipped
:=
make
([]
byte
,
100
)
for
{
n
,
err
:=
zipper
.
Read
(
unzipped
)
if
n
>
0
&&
err
==
nil
{
buf
.
Write
(
unzipped
[
0
:
n
])
}
else
{
break
}
}
zipper
.
Close
()
return
buf
.
Bytes
()
}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Wed, Jun 4, 7:22 PM (4 d, 16 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3399125
Attached To
rPKAFKA Kafka Debian packaging
Event Timeline
Log In to Comment